Author: cliffjansen
Date: Thu Feb  9 16:31:45 2012
New Revision: 1242380

URL: http://svn.apache.org/viewvc?rev=1242380&view=rev
Log:
QPID-3571 PosixPoller implementation

Added:
    qpid/trunk/qpid/cpp/src/qpid/sys/posix/PosixPoller.cpp

Added: qpid/trunk/qpid/cpp/src/qpid/sys/posix/PosixPoller.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/posix/PosixPoller.cpp?rev=1242380&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/posix/PosixPoller.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/posix/PosixPoller.cpp Thu Feb  9 16:31:45 
2012
@@ -0,0 +1,793 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/IOHandle.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/AtomicCount.h"
+#include "qpid/sys/DeletionManager.h"
+#include "qpid/sys/posix/check.h"
+#include "qpid/sys/posix/PrivatePosix.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/Condition.h"
+
+#include <poll.h>
+#include <errno.h>
+#include <signal.h>
+
+#include <assert.h>
+#include <queue>
+#include <set>
+#include <exception>
+
+/*
+ *
+ * This is a qpid::sys::Poller implementation for Posix systems.
+ *
+ * This module follows the structure of the Linux EpollPoller as closely as 
possible
+ * to simplify maintainability.  Noteworthy differences:
+ *
+ * The Linux epoll_xxx() calls present one event at a time to multiple callers 
whereas poll()
+ * returns one or more events to a single caller.  The EventStream class 
layers a
+ * "one event per call" view of the poll() result to multiple threads.
+ *
+ * The HandleSet is the master set of in-use PollerHandles.  The EventStream
+ * maintains a snapshot copy taken just before the call to poll() that remains 
static
+ * until all flagged events have been processed.
+ *
+ * There is an additional window where the PollerHandlePrivate class may 
survive the
+ * parent PollerHandle destructor, i.e. between snapshots.
+ *
+ * Safe interrupting of the Poller is implemented using the "self-pipe trick".
+ *
+ */
+
+namespace qpid {
+namespace sys {
+
+// Deletion manager to handle deferring deletion of PollerHandles to when they 
definitely aren't being used
+DeletionManager<PollerHandlePrivate> PollerHandleDeletionManager;
+
+//  Instantiate (and define) class static for DeletionManager
+template <>
+DeletionManager<PollerHandlePrivate>::AllThreadsStatuses 
DeletionManager<PollerHandlePrivate>::allThreadsStatuses(0);
+
+class PollerHandlePrivate {
+    friend class Poller;
+    friend class PollerPrivate;
+    friend class PollerHandle;
+    friend class HandleSet;
+
+    enum FDStat {
+        ABSENT,
+        MONITORED,
+        INACTIVE,
+        HUNGUP,
+        MONITORED_HUNGUP,
+        INTERRUPTED,
+        INTERRUPTED_HUNGUP,
+        DELETED
+    };
+
+    short events;
+    const IOHandlePrivate* ioHandle;
+    PollerHandle* pollerHandle;
+    FDStat stat;
+    Mutex lock;
+
+    PollerHandlePrivate(const IOHandlePrivate* h, PollerHandle* p) :
+      events(0),
+      ioHandle(h),
+      pollerHandle(p),
+      stat(ABSENT) {
+    }
+
+    int fd() const {
+        return toFd(ioHandle);
+    }
+
+    bool isActive() const {
+        return stat == MONITORED || stat == MONITORED_HUNGUP;
+    }
+
+    void setActive() {
+        stat = (stat == HUNGUP || stat == INTERRUPTED_HUNGUP)
+            ? MONITORED_HUNGUP
+            : MONITORED;
+    }
+
+    bool isInactive() const {
+        return stat == INACTIVE || stat == HUNGUP;
+    }
+
+    void setInactive() {
+        stat = INACTIVE;
+    }
+
+    bool isIdle() const {
+        return stat == ABSENT;
+    }
+
+    void setIdle() {
+        stat = ABSENT;
+    }
+
+    bool isHungup() const {
+        return
+            stat == MONITORED_HUNGUP ||
+            stat == HUNGUP ||
+            stat == INTERRUPTED_HUNGUP;
+    }
+
+    void setHungup() {
+        assert(stat == MONITORED);
+        stat = HUNGUP;
+    }
+
+    bool isInterrupted() const {
+        return stat == INTERRUPTED || stat == INTERRUPTED_HUNGUP;
+    }
+
+    void setInterrupted() {
+        stat = (stat == MONITORED_HUNGUP || stat == HUNGUP)
+            ? INTERRUPTED_HUNGUP
+            : INTERRUPTED;
+    }
+
+    bool isDeleted() const {
+        return stat == DELETED;
+    }
+
+    void setDeleted() {
+        stat = DELETED;
+    }
+};
+
+PollerHandle::PollerHandle(const IOHandle& h) :
+    impl(new PollerHandlePrivate(h.impl, this))
+{}
+
+PollerHandle::~PollerHandle() {
+    {
+    ScopedLock<Mutex> l(impl->lock);
+    if (impl->isDeleted()) {
+        return;
+    }
+    impl->pollerHandle = 0;
+    if (impl->isInterrupted()) {
+        impl->setDeleted();
+        return;
+    }
+    assert(impl->isIdle());
+    impl->setDeleted();
+    }
+    PollerHandleDeletionManager.markForDeletion(impl);
+}
+
+class HandleSet
+{
+    Mutex lock;
+    bool stale;
+    std::set<PollerHandlePrivate*> handles;
+  public:
+    HandleSet() : stale(true) {}
+    void add(PollerHandlePrivate*);
+    void remove(PollerHandlePrivate*);
+    void cleanup();
+    bool snapshot(std::vector<PollerHandlePrivate *>& , std::vector<struct 
::pollfd>&);
+    void setStale();
+};
+
+void HandleSet::add(PollerHandlePrivate* h)
+{
+    ScopedLock<Mutex> l(lock);
+    handles.insert(h);
+}
+void HandleSet::remove(PollerHandlePrivate* h)
+{
+    ScopedLock<Mutex> l(lock);
+    handles.erase(h);
+}
+void HandleSet::cleanup()
+{
+    // Inform all registered handles of disconnection
+    std::set<PollerHandlePrivate*> copy;
+    handles.swap(copy);
+    for (std::set<PollerHandlePrivate*>::const_iterator i = copy.begin(); i != 
copy.end(); ++i) {
+        PollerHandlePrivate& eh = **i;
+        {
+            ScopedLock<Mutex> l(eh.lock);
+            if (!eh.isDeleted()) {
+                Poller::Event event((*i)->pollerHandle, Poller::DISCONNECTED);
+                event.process();
+            }
+        }
+    }
+}
+void HandleSet::setStale()
+{
+    // invalidate cached pollfds for next snapshot
+    ScopedLock<Mutex> l(lock);
+    stale = true;
+}
+
+/**
+ * Concrete implementation of Poller to use Posix poll()
+ * interface
+ */
+class PollerPrivate {
+    friend class Poller;
+    friend class EventStream;
+    friend class HandleSet;
+
+    class SignalPipe {
+        /**
+         * Used to wakeup a thread in ::poll()
+         */
+        int fds[2];
+        bool signaled;
+        bool permanent;
+        Mutex lock;
+    public:
+        SignalPipe() : signaled(false), permanent(false) {
+            QPID_POSIX_CHECK(::pipe(fds));
+        }
+
+        ~SignalPipe() {
+            ::close(fds[0]);
+            ::close(fds[1]);
+        }
+
+        int getFD() {
+            return fds[0];
+        }
+
+        bool isSet() {
+            return signaled;
+        }
+
+        void set() {
+            ScopedLock<Mutex> l(lock);
+            if (signaled)
+                return;
+            signaled = true;
+            QPID_POSIX_CHECK(::write(fds[1], " ", 1));
+        }
+
+        void reset() {
+            if (permanent)
+                return;
+            ScopedLock<Mutex> l(lock);
+            if (signaled) {
+                char ignore;
+                QPID_POSIX_CHECK(::read(fds[0], &ignore, 1));
+                signaled = false;
+            }
+        }
+
+        void setPermanently() {
+            // async signal safe calls only.  No locking.
+            permanent = true;
+            signaled = true;
+            QPID_POSIX_CHECK(::write(fds[1], "  ", 2));
+            // poll() should never block now
+        }
+    };
+
+    // Collect pending events and serialize access.  Maintain array of pollfd 
structs.
+    class EventStream {
+        typedef Poller::Event Event;
+        PollerPrivate& pollerPrivate;
+        SignalPipe& signalPipe;
+        std::queue<PollerHandlePrivate*> interruptedHandles;
+        std::vector<struct ::pollfd> pollfds;
+        std::vector<PollerHandlePrivate*> pollHandles;
+        Mutex streamLock;
+        Mutex serializeLock;
+        Condition serializer;
+        bool busy;
+        int currentPollfd;
+        int pollCount;
+        int waiters;
+
+    public:
+
+        EventStream(PollerPrivate* p) : pollerPrivate(*p), 
signalPipe(p->signalPipe), busy(false),
+                                        currentPollfd(0), pollCount(0), 
waiters(0) {
+            // The signal pipe is the first element of pollfds and pollHandles
+            pollfds.reserve(8);
+            pollfds.resize(1);
+            pollfds[0].fd = pollerPrivate.signalPipe.getFD();
+            pollfds[0].events = POLLIN;
+            pollfds[0].revents = 0;
+
+            pollHandles.reserve(8);
+            pollHandles.resize(1);
+            pollHandles[0] = 0;
+        }
+
+        void addInterrupt(PollerHandle& handle) {
+            ScopedLock<Mutex> l(streamLock);
+            interruptedHandles.push(handle.impl);
+        }
+
+        // Serialize access to the stream.
+        Event next(Duration timeout) {
+            AbsTime targetTimeout =
+                (timeout == TIME_INFINITE) ?
+                FAR_FUTURE :
+                AbsTime(now(), timeout);
+
+
+            ScopedLock<Mutex> l(serializeLock);
+            Event event(0, Poller::INVALID);
+            while (busy) {
+                waiters++;
+                bool timedout = !serializer.wait(serializeLock, targetTimeout);
+                waiters--;
+
+                if (busy && timedout) {
+                    return Event(0, Poller::TIMEOUT);
+                }
+            }
+            busy = true;
+            {
+                ScopedUnlock<Mutex> ul(serializeLock);
+                event = getEvent(targetTimeout);
+            }
+            busy = false;
+
+            if (waiters > 0)
+                serializer.notify();
+            return event;
+        }
+
+        Event getEvent(AbsTime targetTimeout) {
+            bool timeoutPending = false;
+
+            ScopedLock<Mutex> l(streamLock); // hold lock except for poll()
+
+            // loop until poll event, async interrupt, or timeout
+            while (true) {
+
+                // first check for any interrupts
+                while (interruptedHandles.size() > 0) {
+                    PollerHandlePrivate& eh = *interruptedHandles.front();
+                    interruptedHandles.pop();
+                    {
+                        ScopedLock<Mutex> lk(eh.lock);
+                        if (!eh.isDeleted()) {
+                            if (!eh.isIdle()) {
+                                eh.setInactive();
+                            }
+
+                            // nullify the corresponding pollfd event, if any
+                            int ehfd = eh.fd();
+                            std::vector<struct ::pollfd>::iterator i = 
pollfds.begin() + 1; // skip self pipe at front
+                            for (; i != pollfds.end(); i++) {
+                                if (i->fd == ehfd) {
+                                    i->events = 0;
+                                    if (i->revents) {
+                                        i->revents = 0;
+                                        pollCount--;
+                                    }
+                                    break;
+                                }
+                            }
+                            return Event(eh.pollerHandle, Poller::INTERRUPTED);
+                        }
+                    }
+                    PollerHandleDeletionManager.markForDeletion(&eh);
+                }
+
+                // Check for shutdown
+                if (pollerPrivate.isShutdown) {
+                    PollerHandleDeletionManager.markAllUnusedInThisThread();
+                    return Event(0, Poller::SHUTDOWN);
+                }
+
+                // search for any remaining events from earlier poll()
+                int nfds = pollfds.size();
+                while ((pollCount > 0) && (currentPollfd < nfds)) {
+                    int index = currentPollfd++;
+                    short evt = pollfds[index].revents;
+                    if (evt != 0) {
+                        pollCount--;
+                        PollerHandlePrivate& eh = *pollHandles[index];
+                        ScopedLock<Mutex> l(eh.lock);
+                        // stop polling this handle until resetMode()
+                        pollfds[index].events = 0;
+
+                        // the handle could have gone inactive since snapshot 
taken
+                        if (eh.isActive()) {
+                            PollerHandle* handle = eh.pollerHandle;
+                            assert(handle);
+
+                            // If the connection has been hungup we could 
still be readable
+                            // (just not writable), allow us to readable until 
we get here again
+                            if (evt & POLLHUP) {
+                                if (eh.isHungup()) {
+                                    eh.setInactive();
+                                    // Don't set up last Handle so that we 
don't reset this handle
+                                    // on re-entering Poller::wait. This means 
that we will never
+                                    // be set active again once we've returned 
disconnected, and so
+                                    // can never be returned again.
+                                    return Event(handle, Poller::DISCONNECTED);
+                                }
+                                eh.setHungup();
+                            } else {
+                                eh.setInactive();
+                            }
+                            return Event(handle, 
PollerPrivate::epollToDirection(evt));
+                        }
+                    }
+                }
+
+                if (timeoutPending) {
+                    return Event(0, Poller::TIMEOUT);
+                }
+
+                // no outstanding events, poll() for more
+                {
+                    ScopedUnlock<Mutex> ul(streamLock);
+
+                    bool refreshed = 
pollerPrivate.registeredHandles.snapshot(pollHandles, pollfds);
+                    if (refreshed) {
+                        // we just drained all interruptedHandles and got a 
fresh snapshot
+                        
PollerHandleDeletionManager.markAllUnusedInThisThread();
+                    }
+
+                    if (!signalPipe.isSet()) {
+                        int timeoutMs = -1;
+                        if (!(targetTimeout == FAR_FUTURE)) {
+                            timeoutMs = Duration(now(), targetTimeout) / 
TIME_MSEC;
+                            if (timeoutMs < 0)
+                                timeoutMs = 0;
+                        }
+
+                        pollCount = ::poll(&pollfds[0], pollfds.size(), 
timeoutMs);
+
+                        if (pollCount ==-1 && errno != EINTR) {
+                            QPID_POSIX_CHECK(pollCount);
+                        }
+                        else if (pollCount == 0) {
+                            // timeout, unless shutdown or interrupt arrives 
in another thread
+                            timeoutPending = true;
+                        }
+                        else {
+                            if (pollfds[0].revents) {
+                                pollCount--; // signal pipe doesn't count
+                            }
+                        }
+                    }
+                    else
+                        pollCount = 0;
+                    signalPipe.reset();
+                }
+                currentPollfd = 1;
+            }
+        }
+    };
+
+    bool isShutdown;
+    HandleSet registeredHandles;
+    AtomicCount threadCount;
+    SignalPipe signalPipe;
+    EventStream eventStream;
+
+    static short directionToEpollEvent(Poller::Direction dir) {
+        switch (dir) {
+            case Poller::INPUT:  return POLLIN;
+            case Poller::OUTPUT: return POLLOUT;
+            case Poller::INOUT:  return POLLIN | POLLOUT;
+            default: return 0;
+        }
+    }
+
+    static Poller::EventType epollToDirection(short events) {
+        // POLLOUT & POLLHUP are mutually exclusive really, but at least 
socketpairs
+        // can give you both!
+        events = (events & POLLHUP) ? events & ~POLLOUT : events;
+        short e = events & (POLLIN | POLLOUT);
+        switch (e) {
+            case POLLIN: return Poller::READABLE;
+            case POLLOUT: return Poller::WRITABLE;
+            case POLLIN | POLLOUT: return Poller::READ_WRITABLE;
+            default:
+              return (events & (POLLHUP | POLLERR)) ?
+                    Poller::DISCONNECTED : Poller::INVALID;
+        }
+    }
+
+    PollerPrivate() :
+        isShutdown(false), eventStream(this) {
+    }
+
+    ~PollerPrivate() {}
+
+    void resetMode(PollerHandlePrivate& handle);
+
+    void interrupt() {
+        signalPipe.set();
+    }
+
+    void interruptAll() {
+        // be async signal safe
+        signalPipe.setPermanently();
+    }
+};
+
+
+void Poller::registerHandle(PollerHandle& handle) {
+    PollerHandlePrivate& eh = *handle.impl;
+    ScopedLock<Mutex> l(eh.lock);
+    assert(eh.isIdle());
+
+    eh.setActive();
+    impl->registeredHandles.add(handle.impl);
+    // not stale until monitored
+}
+
+void Poller::unregisterHandle(PollerHandle& handle) {
+    PollerHandlePrivate& eh = *handle.impl;
+    ScopedLock<Mutex> l(eh.lock);
+    assert(!eh.isIdle());
+
+    eh.setIdle();
+    impl->registeredHandles.remove(handle.impl);
+    impl->registeredHandles.setStale();
+    impl->interrupt();
+}
+
+void PollerPrivate::resetMode(PollerHandlePrivate& eh) {
+    PollerHandle* ph;
+    {
+        // Called after an event has been processed for a handle
+        ScopedLock<Mutex> l(eh.lock);
+        assert(!eh.isActive());
+
+        if (eh.isIdle() || eh.isDeleted()) {
+            return;
+        }
+
+        if (eh.events==0) {
+            eh.setActive();
+            return;
+        }
+
+        if (!eh.isInterrupted()) {
+            // Handle still in use, allow events to resume.
+            eh.setActive();
+            registeredHandles.setStale();
+            // Ouch. This scales poorly for large handle sets.
+            // TODO: avoid new snapshot, perhaps create an index to pollfds or 
a
+            // pending reset queue to be processed before each poll().  
However, the real
+            // scalable solution is to implement the OS-specific epoll 
equivalent.
+            interrupt();
+            return;
+        }
+        ph = eh.pollerHandle;
+    }
+
+    eventStream.addInterrupt(*ph);
+    interrupt();
+}
+
+void Poller::monitorHandle(PollerHandle& handle, Direction dir) {
+    PollerHandlePrivate& eh = *handle.impl;
+    ScopedLock<Mutex> l(eh.lock);
+    assert(!eh.isIdle());
+
+    short oldEvents = eh.events;
+    eh.events |= PollerPrivate::directionToEpollEvent(dir);
+
+    // If no change nothing more to do - avoid unnecessary system call
+    if (oldEvents==eh.events) {
+        return;
+    }
+
+    // If we're not actually listening wait till we are to perform change
+    if (!eh.isActive()) {
+        return;
+    }
+
+    // tell polling thread to update its pollfds
+    impl->registeredHandles.setStale();
+    impl->interrupt();
+}
+
+void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) {
+    PollerHandlePrivate& eh = *handle.impl;
+    ScopedLock<Mutex> l(eh.lock);
+    assert(!eh.isIdle());
+
+    short oldEvents = eh.events;
+    eh.events &= ~PollerPrivate::directionToEpollEvent(dir);
+
+    // If no change nothing more to do - avoid unnecessary system call
+    if (oldEvents==eh.events) {
+        return;
+    }
+
+    // If we're not actually listening wait till we are to perform change
+    if (!eh.isActive()) {
+        return;
+    }
+
+    impl->registeredHandles.setStale();
+    impl->interrupt();
+}
+
+void Poller::shutdown() {
+    // NB: this function must be async-signal safe, it must not
+    // call any function that is not async-signal safe.
+
+    // Allow sloppy code to shut us down more than once
+    if (impl->isShutdown)
+        return;
+
+    // Don't use any locking here - isShutdown will be visible to all
+    // after the write() anyway (it's a memory barrier)
+    impl->isShutdown = true;
+
+    impl->interruptAll();
+}
+
+bool Poller::interrupt(PollerHandle& handle) {
+    {
+        PollerHandlePrivate& eh = *handle.impl;
+        ScopedLock<Mutex> l(eh.lock);
+        if (eh.isIdle() || eh.isDeleted()) {
+            return false;
+        }
+
+        if (eh.isInterrupted()) {
+            return true;
+        }
+
+        if (eh.isInactive()) {
+            eh.setInterrupted();
+            return true;
+        }
+        eh.setInterrupted();
+        eh.events = 0;
+    }
+
+    impl->registeredHandles.setStale();
+    impl->eventStream.addInterrupt(handle);
+    impl->interrupt();
+    return true;
+}
+
+void Poller::run() {
+    // Ensure that we exit thread responsibly under all circumstances
+    try {
+        // Make sure we can't be interrupted by signals at a bad time
+        ::sigset_t ss;
+        ::sigfillset(&ss);
+        ::pthread_sigmask(SIG_SETMASK, &ss, 0);
+
+        ++(impl->threadCount);
+        do {
+            Event event = wait();
+
+            // If can read/write then dispatch appropriate callbacks
+            if (event.handle) {
+                event.process();
+            } else {
+                // Handle shutdown
+                switch (event.type) {
+                case SHUTDOWN:
+                    //last thread to respond to shutdown cleans up:
+                    if (--(impl->threadCount) == 0) 
impl->registeredHandles.cleanup();
+                    PollerHandleDeletionManager.destroyThreadState();
+                    return;
+                default:
+                    // This should be impossible
+                    assert(false);
+                }
+            }
+        } while (true);
+    } catch (const std::exception& e) {
+        QPID_LOG(error, "IO worker thread exiting with unhandled exception: " 
<< e.what());
+    }
+    PollerHandleDeletionManager.destroyThreadState();
+    --(impl->threadCount);
+}
+
+bool Poller::hasShutdown()
+{
+    return impl->isShutdown;
+}
+
+Poller::Event Poller::wait(Duration timeout) {
+    static __thread PollerHandlePrivate* lastReturnedHandle = 0;
+
+    if (lastReturnedHandle) {
+        impl->resetMode(*lastReturnedHandle);
+        lastReturnedHandle = 0;
+    }
+
+    Event event = impl->eventStream.next(timeout);
+
+    switch (event.type) {
+    case INTERRUPTED:
+    case READABLE:
+    case WRITABLE:
+    case READ_WRITABLE:
+        lastReturnedHandle = event.handle->impl;
+        break;
+    default:
+        ;
+    }
+
+    return event;
+}
+
+// Concrete constructors
+Poller::Poller() :
+    impl(new PollerPrivate())
+{}
+
+Poller::~Poller() {
+    delete impl;
+}
+
+
+bool HandleSet::snapshot(std::vector<PollerHandlePrivate *>& hs , 
std::vector<struct ::pollfd>& fds)
+{
+    // Element 0 of the vectors is always the signal pipe, leave undisturbed
+    {
+        ScopedLock<Mutex> l(lock);
+        if (!stale)
+            return false;       // no refresh done
+
+        hs.resize(1);
+        for (std::set<PollerHandlePrivate*>::const_iterator i = 
handles.begin(); i != handles.end(); ++i) {
+            hs.push_back(*i);
+        }
+        stale = false;
+        // have copy of handle set (in vector form), drop the lock and build 
the pollfds
+    }
+
+    // sync pollfds to same sizing as the handles
+    int sz = hs.size();
+    fds.resize(sz);
+
+    for (int j = 1; j < sz; ++j) {
+        // create a pollfd entry for each handle
+        struct ::pollfd& pollfd = fds[j];
+        PollerHandlePrivate& eh = *hs[j];
+        ScopedLock<Mutex> lk(eh.lock);
+
+        if (!eh.isInactive() && !eh.isDeleted()) {
+            pollfd.fd = eh.fd();
+            pollfd.events = eh.events;
+        } else {
+            pollfd.fd = -1;     // tell poll() to ignore this fd
+            pollfd.events = 0;
+        }
+    }
+    return true;
+}
+
+
+}}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to