Author: ritchiem Date: Wed Oct 28 15:35:38 2009 New Revision: 830609 URL: http://svn.apache.org/viewvc?rev=830609&view=rev Log: Carry over recent AsynchIO-level changes to Windows.
Modified: qpid/branches/0.5.x-dev/qpid/cpp/include/qpid/sys/IOHandle.h qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/Broker.h qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/sys/windows/Socket.cpp Modified: qpid/branches/0.5.x-dev/qpid/cpp/include/qpid/sys/IOHandle.h URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/include/qpid/sys/IOHandle.h?rev=830609&r1=830608&r2=830609&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/cpp/include/qpid/sys/IOHandle.h (original) +++ qpid/branches/0.5.x-dev/qpid/cpp/include/qpid/sys/IOHandle.h Wed Oct 28 15:35:38 2009 @@ -35,6 +35,8 @@ class AsynchAcceptorPrivate; class AsynchAcceptResult; namespace windows { + class AsynchAcceptor; + class AsynchAcceptResult; class AsynchIO; } @@ -43,8 +45,8 @@ class IOHandlePrivate; class IOHandle { - friend class AsynchAcceptorPrivate; - friend class AsynchAcceptResult; + friend class windows::AsynchAcceptResult; + friend class windows::AsynchAcceptor; friend class windows::AsynchIO; friend class PollerHandle; @@ -52,7 +54,7 @@ protected: IOHandlePrivate* const impl; - IOHandle(IOHandlePrivate*); + IOHandle(IOHandlePrivate*); QPID_COMMON_EXTERN virtual ~IOHandle(); }; Modified: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/Broker.h URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/Broker.h?rev=830609&r1=830608&r2=830609&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/Broker.h (original) +++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/Broker.h Wed Oct 28 15:35:38 2009 @@ -201,7 +201,7 @@ void registerProtocolFactory(const std::string& name, boost::shared_ptr<sys::ProtocolFactory>); /** Accept connections */ - void accept(); + QPID_BROKER_EXTERN void accept(); /** Create a connection to another broker. */ void connect(const std::string& host, uint16_t port, Modified: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp?rev=830609&r1=830608&r2=830609&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp (original) +++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp Wed Oct 28 15:35:38 2009 @@ -74,6 +74,7 @@ namespace qpid { namespace sys { +namespace windows { /* * Asynch Acceptor @@ -88,13 +89,13 @@ * and status of each accept operation outstanding. */ -class AsynchAcceptorPrivate { +class AsynchAcceptor : public qpid::sys::AsynchAcceptor { friend class AsynchAcceptResult; public: - AsynchAcceptorPrivate(const Socket& s, AsynchAcceptor::Callback callback); - ~AsynchAcceptorPrivate(); + AsynchAcceptor(const Socket& s, AsynchAcceptor::Callback callback); + ~AsynchAcceptor(); void start(Poller::shared_ptr poller); private: @@ -104,19 +105,7 @@ const Socket& socket; }; -AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) : - impl(new AsynchAcceptorPrivate(s, callback)) -{} - -AsynchAcceptor::~AsynchAcceptor() -{ delete impl; } - -void AsynchAcceptor::start(Poller::shared_ptr poller) { - impl->start(poller); -} - -AsynchAcceptorPrivate::AsynchAcceptorPrivate(const Socket& s, - AsynchAcceptor::Callback callback) +AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) : acceptedCallback(callback), socket(s) { @@ -128,16 +117,17 @@ #endif } -AsynchAcceptorPrivate::~AsynchAcceptorPrivate(void) { +AsynchAcceptor::~AsynchAcceptor() +{ socket.close(); } -void AsynchAcceptorPrivate::start(Poller::shared_ptr poller) { +void AsynchAcceptor::start(Poller::shared_ptr poller) { poller->monitorHandle(PollerHandle(socket), Poller::INPUT); restart (); } -void AsynchAcceptorPrivate::restart(void) { +void AsynchAcceptor::restart(void) { DWORD bytesReceived = 0; // Not used, needed for AcceptEx API AsynchAcceptResult *result = new AsynchAcceptResult(acceptedCallback, this, @@ -156,7 +146,7 @@ AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb, - AsynchAcceptorPrivate *acceptor, + AsynchAcceptor *acceptor, SOCKET listener) : callback(cb), acceptor(acceptor), listener(listener) { newSocket.reset (new Socket()); @@ -174,13 +164,11 @@ } void AsynchAcceptResult::failure(int status) { - //if (status != WSA_OPERATION_ABORTED) - // Can there be anything else? ; - delete this; + //if (status != WSA_OPERATION_ABORTED) + // Can there be anything else? ; + delete this; } -namespace windows { - /* * AsynchConnector does synchronous connects for now... to do asynch the * IocpPoller will need some extension to register an event handle as a @@ -224,6 +212,12 @@ } // namespace windows +AsynchAcceptor* AsynchAcceptor::create(const Socket& s, + Callback callback) +{ + return new windows::AsynchAcceptor(s, callback); +} + AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s, Poller::shared_ptr poller, std::string hostname, @@ -231,12 +225,12 @@ ConnectedCallback connCb, FailedCallback failCb) { - return new qpid::sys::windows::AsynchConnector(s, - poller, - hostname, - port, - connCb, - failCb); + return new windows::AsynchConnector(s, + poller, + hostname, + port, + connCb, + failCb); } Modified: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h?rev=830609&r1=830608&r2=830609&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h (original) +++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h Wed Oct 28 15:35:38 2009 @@ -30,6 +30,7 @@ namespace qpid { namespace sys { +namespace windows { /* * AsynchIoResult defines the class that receives the result of an @@ -73,14 +74,13 @@ int status; }; -class AsynchAcceptorPrivate; class AsynchAcceptResult : public AsynchResult { - friend class AsynchAcceptorPrivate; + friend class AsynchAcceptor; public: - AsynchAcceptResult(AsynchAcceptor::Callback cb, - AsynchAcceptorPrivate *acceptor, + AsynchAcceptResult(qpid::sys::AsynchAcceptor::Callback cb, + AsynchAcceptor *acceptor, SOCKET listener); virtual void success (size_t bytesTransferred); virtual void failure (int error); @@ -89,8 +89,8 @@ virtual void complete(void) {} // No-op for this class. std::auto_ptr<qpid::sys::Socket> newSocket; - AsynchAcceptor::Callback callback; - AsynchAcceptorPrivate *acceptor; + qpid::sys::AsynchAcceptor::Callback callback; + AsynchAcceptor *acceptor; SOCKET listener; // AcceptEx needs a place to write the local and remote addresses @@ -106,16 +106,16 @@ typedef boost::function1<void, AsynchIoResult *> Completer; virtual ~AsynchIoResult() {} - AsynchIO::BufferBase *getBuff(void) const { return iobuff; } + qpid::sys::AsynchIO::BufferBase *getBuff(void) const { return iobuff; } size_t getRequested(void) const { return requested; } const WSABUF *getWSABUF(void) const { return &wsabuf; } protected: - void setBuff (AsynchIO::BufferBase *buffer) { iobuff = buffer; } + void setBuff (qpid::sys::AsynchIO::BufferBase *buffer) { iobuff = buffer; } protected: AsynchIoResult(Completer cb, - AsynchIO::BufferBase *buff, size_t length) + qpid::sys::AsynchIO::BufferBase *buff, size_t length) : completionCallback(cb), iobuff(buff), requested(length) {} virtual void complete(void) = 0; @@ -123,7 +123,7 @@ Completer completionCallback; private: - AsynchIO::BufferBase *iobuff; + qpid::sys::AsynchIO::BufferBase *iobuff; size_t requested; // Number of bytes in original I/O request }; @@ -137,7 +137,7 @@ public: AsynchReadResult(AsynchIoResult::Completer cb, - AsynchIO::BufferBase *buff, + qpid::sys::AsynchIO::BufferBase *buff, size_t length) : AsynchIoResult(cb, buff, length) { wsabuf.buf = buff->bytes + buff->dataCount; @@ -149,7 +149,7 @@ // complete() updates buffer then does completion callback. virtual void complete(void) { - AsynchIO::BufferBase *b = getBuff(); + qpid::sys::AsynchIO::BufferBase *b = getBuff(); b->dataStart += bytes; b->dataCount -= bytes; completionCallback(this); @@ -157,7 +157,7 @@ public: AsynchWriteResult(AsynchIoResult::Completer cb, - AsynchIO::BufferBase *buff, + qpid::sys::AsynchIO::BufferBase *buff, size_t length) : AsynchIoResult(cb, buff, length) { wsabuf.buf = buff ? buff->bytes : 0; @@ -188,15 +188,15 @@ public: AsynchCallbackRequest(AsynchIoResult::Completer cb, - AsynchIO::RequestCallback reqCb) + qpid::sys::AsynchIO::RequestCallback reqCb) : AsynchIoResult(cb, 0, 0), reqCallback(reqCb) { wsabuf.buf = 0; wsabuf.len = 0; } - AsynchIO::RequestCallback reqCallback; + qpid::sys::AsynchIO::RequestCallback reqCallback; }; -}} +}}} // qpid::sys::windows #endif /*!_windows_asynchIoResult_h*/ Modified: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h?rev=830609&r1=830608&r2=830609&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h (original) +++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h Wed Oct 28 15:35:38 2009 @@ -22,6 +22,7 @@ * */ +#include "qpid/sys/AsynchIO.h" #include "qpid/sys/windows/AsynchIoResult.h" #include "qpid/CommonImportExport.h" @@ -40,13 +41,13 @@ class IOHandlePrivate { public: IOHandlePrivate(SOCKET f = INVALID_SOCKET, - AsynchIoResult::Completer cb = 0, + windows::AsynchIoResult::Completer cb = 0, AsynchIO::RequestCallback reqCallback = 0) : fd(f), event(cb), cbRequest(reqCallback) {} SOCKET fd; - AsynchIoResult::Completer event; + windows::AsynchIoResult::Completer event; AsynchIO::RequestCallback cbRequest; }; Modified: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp?rev=830609&r1=830608&r2=830609&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp (original) +++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp Wed Oct 28 15:35:38 2009 @@ -42,11 +42,11 @@ friend class PollerHandle; SOCKET fd; - AsynchIoResult::Completer cb; + windows::AsynchIoResult::Completer cb; AsynchIO::RequestCallback cbRequest; PollerHandlePrivate(SOCKET f, - AsynchIoResult::Completer cb0 = 0, + windows::AsynchIoResult::Completer cb0 = 0, AsynchIO::RequestCallback rcb = 0) : fd(f), cb(cb0), cbRequest(rcb) { @@ -133,13 +133,14 @@ assert(dir == Poller::INPUT || dir == Poller::OUTPUT); if (dir == Poller::OUTPUT) { - AsynchWriteWanted *result = new AsynchWriteWanted(handle.impl->cb); + windows::AsynchWriteWanted *result = + new windows::AsynchWriteWanted(handle.impl->cb); PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped()); } else { - AsynchCallbackRequest *result = - new AsynchCallbackRequest(handle.impl->cb, - handle.impl->cbRequest); + windows::AsynchCallbackRequest *result = + new windows::AsynchCallbackRequest(handle.impl->cb, + handle.impl->cbRequest); PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped()); } } @@ -155,7 +156,7 @@ DWORD numTransferred = 0; ULONG_PTR completionKey = 0; OVERLAPPED *overlapped = 0; - AsynchResult *result = 0; + windows::AsynchResult *result = 0; // Wait for either an I/O operation to finish (thus signaling the // IOCP handle) or a shutdown request to be made (thus signaling the @@ -185,7 +186,7 @@ return Event(0, SHUTDOWN); } - result = AsynchResult::from_overlapped(overlapped); + result = windows::AsynchResult::from_overlapped(overlapped); result->success (static_cast<size_t>(numTransferred)); } else { @@ -193,7 +194,7 @@ // Dequeued a completion for a failed operation. Downcast back // to the result object and inform it that the operation failed. DWORD status = ::GetLastError(); - result = AsynchResult::from_overlapped(overlapped); + result = windows::AsynchResult::from_overlapped(overlapped); result->failure (static_cast<int>(status)); } } Modified: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp?rev=830609&r1=830608&r2=830609&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp (original) +++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp Wed Oct 28 15:35:38 2009 @@ -47,8 +47,8 @@ const boost::shared_ptr<sys::Poller>& poller); ~PollableConditionPrivate(); - void poke(); - void dispatch(AsynchIoResult *result); + void poke(); + void dispatch(windows::AsynchIoResult *result); private: PollableCondition::Callback cb; @@ -82,7 +82,7 @@ poller->monitorHandle(ph, Poller::INPUT); } -void PollableConditionPrivate::dispatch(AsynchIoResult *result) +void PollableConditionPrivate::dispatch(windows::AsynchIoResult *result) { delete result; // Poller::monitorHandle() allocates this cb(parent); Modified: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/sys/windows/Socket.cpp URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/sys/windows/Socket.cpp?rev=830609&r1=830608&r2=830609&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/sys/windows/Socket.cpp (original) +++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/sys/windows/Socket.cpp Wed Oct 28 15:35:38 2009 @@ -135,7 +135,9 @@ } // namespace Socket::Socket() : - IOHandle(new IOHandlePrivate) + IOHandle(new IOHandlePrivate), + nonblocking(false), + nodelay(false) { SOCKET& socket = impl->fd; if (socket != INVALID_SOCKET) Socket::close(); @@ -145,7 +147,9 @@ } Socket::Socket(IOHandlePrivate* h) : - IOHandle(h) + IOHandle(h), + nonblocking(false), + nodelay(false) {} void @@ -162,6 +166,7 @@ try { if (nonblocking) setNonblocking(); + if (nodelay) setTcpNoDelay(); } catch (std::exception&) { closesocket(s); socket = INVALID_SOCKET; @@ -313,17 +318,16 @@ return result; } -void Socket::setTcpNoDelay(bool nodelay) const +void Socket::setTcpNoDelay() const { - if (nodelay) { - int flag = 1; - int result = setsockopt(impl->fd, - IPPROTO_TCP, - TCP_NODELAY, - (char *)&flag, - sizeof(flag)); - QPID_WINSOCK_CHECK(result); - } + int flag = 1; + int result = setsockopt(impl->fd, + IPPROTO_TCP, + TCP_NODELAY, + (char *)&flag, + sizeof(flag)); + QPID_WINSOCK_CHECK(result); + nodelay = true; } }} // namespace qpid::sys --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org