Feedback and/or help testing is welcome. - Jon
On Aug 28, 2014, at 1:36 PM, Jonathan Siwek <[email protected]> wrote: > Repository : ssh://[email protected]/bro > > On branch : topic/jsiwek/improve_comm_loop > Link : > https://github.com/bro/bro/commit/675fba3fdee0a391cfb6fc52d07b08caaca96c76 > >> --------------------------------------------------------------- > > commit 675fba3fdee0a391cfb6fc52d07b08caaca96c76 > Author: Jon Siwek <[email protected]> > Date: Thu Aug 28 13:13:30 2014 -0500 > > Remove timeouts from remote communication loop. > > The select() now blocks until there's work to do instead of relying on a > small timeout value which can cause unproductive use of cpu cycles. > > >> --------------------------------------------------------------- > > 675fba3fdee0a391cfb6fc52d07b08caaca96c76 > src/CMakeLists.txt | 2 ++ > src/ChunkedIO.cc | 46 +++++++++++++++++++++++++++- > src/ChunkedIO.h | 16 +++++++++- > src/DNS_Mgr.cc | 5 +-- > src/DNS_Mgr.h | 3 +- > src/Flare.cc | 29 ++++++++++++++++++ > src/Flare.h | 45 +++++++++++++++++++++++++++ > src/FlowSrc.cc | 5 +-- > src/FlowSrc.h | 3 +- > src/IOSource.cc | 47 +++++++++++++++++++++------- > src/IOSource.h | 13 +++++--- > src/Pipe.cc | 79 ++++++++++++++++++++++++++++++++++++++++++++++++ > src/Pipe.h | 57 ++++++++++++++++++++++++++++++++++ > src/PktSrc.cc | 5 +-- > src/PktSrc.h | 3 +- > src/RemoteSerializer.cc | 47 +++++++++++++--------------- > src/RemoteSerializer.h | 3 +- > src/Serializer.cc | 5 +-- > src/Serializer.h | 3 +- > src/threading/Manager.cc | 3 +- > src/threading/Manager.h | 3 +- > 21 files changed, 364 insertions(+), 58 deletions(-) > > diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt > index 04867b7..3764533 100644 > --- a/src/CMakeLists.txt > +++ b/src/CMakeLists.txt > @@ -279,6 +279,7 @@ set(bro_SRCS > EventRegistry.cc > Expr.cc > File.cc > + Flare.cc > FlowSrc.cc > Frag.cc > Frame.cc > @@ -299,6 +300,7 @@ set(bro_SRCS > OSFinger.cc > PacketFilter.cc > PersistenceSerializer.cc > + Pipe.cc > PktSrc.cc > PolicyFile.cc > PrefixTable.cc > diff --git a/src/ChunkedIO.cc b/src/ChunkedIO.cc > index 54e2e59..a94eb98 100644 > --- a/src/ChunkedIO.cc > +++ b/src/ChunkedIO.cc > @@ -210,6 +210,7 @@ bool ChunkedIOFd::WriteChunk(Chunk* chunk, bool partial) > else > pending_head = pending_tail = q; > > + write_flare.Fire(); > return Flush(); > } > > @@ -232,6 +233,7 @@ bool ChunkedIOFd::PutIntoWriteBuffer(Chunk* chunk) > write_len += len; > > delete chunk; > + write_flare.Fire(); > > if ( network_time - last_flush > 0.005 ) > FlushWriteBuffer(); > @@ -269,6 +271,10 @@ bool ChunkedIOFd::FlushWriteBuffer() > if ( unsigned(written) == len ) > { > write_pos = write_len = 0; > + > + if ( ! pending_head ) > + write_flare.Extinguish(); > + > return true; > } > > @@ -318,7 +324,12 @@ bool ChunkedIOFd::Flush() > } > } > > - return FlushWriteBuffer(); > + bool rval = FlushWriteBuffer(); > + > + if ( ! pending_head && write_len == 0 ) > + write_flare.Extinguish(); > + > + return rval; > } > > uint32 ChunkedIOFd::ChunkAvailable() > @@ -394,6 +405,9 @@ bool ChunkedIOFd::Read(Chunk** chunk, bool may_block) > #ifdef DEBUG_COMMUNICATION > AddToBuffer("<false:read-chunk>", true); > #endif > + if ( ! ChunkAvailable() ) > + read_flare.Extinguish(); > + > return false; > } > > @@ -402,9 +416,15 @@ bool ChunkedIOFd::Read(Chunk** chunk, bool may_block) > #ifdef DEBUG_COMMUNICATION > AddToBuffer("<null:no-data>", true); > #endif > + read_flare.Extinguish(); > return true; > } > > + if ( ChunkAvailable() ) > + read_flare.Fire(); > + else > + read_flare.Extinguish(); > + > #ifdef DEBUG > if ( *chunk ) > DBG_LOG(DBG_CHUNKEDIO, "read of size %d %s[%s]", > @@ -481,6 +501,9 @@ bool ChunkedIOFd::ReadChunk(Chunk** chunk, bool may_block) > read_pos = 0; > read_len = bytes_left; > > + if ( ! ChunkAvailable() ) > + read_flare.Extinguish(); > + > // If allowed, wait a bit for something to read. > if ( may_block ) > { > @@ -607,6 +630,14 @@ bool ChunkedIOFd::IsFillingUp() > return stats.pending > MAX_BUFFERED_CHUNKS_SOFT; > } > > +std::vector<int> ChunkedIOFd::FdSupplements() const > + { > + std::vector<int> rval; > + rval.push_back(write_flare.FD()); > + rval.push_back(read_flare.FD()); > + return rval; > + } > + > void ChunkedIOFd::Clear() > { > while ( pending_head ) > @@ -618,6 +649,9 @@ void ChunkedIOFd::Clear() > } > > pending_head = pending_tail = 0; > + > + if ( write_len == 0 ) > + write_flare.Extinguish(); > } > > const char* ChunkedIOFd::Error() > @@ -830,6 +864,7 @@ bool ChunkedIOSSL::Write(Chunk* chunk) > else > write_head = write_tail = q; > > + write_flare.Fire(); > Flush(); > return true; > } > @@ -935,6 +970,7 @@ bool ChunkedIOSSL::Flush() > write_state = LEN; > } > > + write_flare.Extinguish(); > return true; > } > > @@ -1104,6 +1140,13 @@ bool ChunkedIOSSL::IsFillingUp() > return false; > } > > +std::vector<int> ChunkedIOSSL::FdSupplements() const > + { > + std::vector<int> rval; > + rval.push_back(write_flare.FD()); > + return rval; > + } > + > void ChunkedIOSSL::Clear() > { > while ( write_head ) > @@ -1114,6 +1157,7 @@ void ChunkedIOSSL::Clear() > write_head = next; > } > write_head = write_tail = 0; > + write_flare.Extinguish(); > } > > const char* ChunkedIOSSL::Error() > diff --git a/src/ChunkedIO.h b/src/ChunkedIO.h > index a9865e4..c640e52 100644 > --- a/src/ChunkedIO.h > +++ b/src/ChunkedIO.h > @@ -6,8 +6,9 @@ > #include "config.h" > #include "List.h" > #include "util.h" > - > +#include "Flare.h" > #include <list> > +#include <vector> > > #ifdef NEED_KRB5_H > # include <krb5.h> > @@ -95,6 +96,11 @@ public: > // Returns underlying fd if available, -1 otherwise. > virtual int Fd() { return -1; } > > + // Returns supplementary file descriptors that become read-ready in > order > + // to signal that there is some work that can be performed. > + virtual std::vector<int> FdSupplements() const > + { return std::vector<int>(); } > + > // Makes sure that no additional protocol data is written into > // the output stream. If this is activated, the output cannot > // be read again by any of these classes! > @@ -177,6 +183,7 @@ public: > virtual void Clear(); > virtual bool Eof() { return eof; } > virtual int Fd() { return fd; } > + virtual std::vector<int> FdSupplements() const; > virtual void Stats(char* buffer, int length); > > private: > @@ -240,6 +247,8 @@ private: > ChunkQueue* pending_tail; > > pid_t pid; > + bro::Flare write_flare; > + bro::Flare read_flare; > }; > > // Chunked I/O using an SSL connection. > @@ -262,6 +271,7 @@ public: > virtual void Clear(); > virtual bool Eof() { return eof; } > virtual int Fd() { return socket; } > + virtual std::vector<int> FdSupplements() const; > virtual void Stats(char* buffer, int length); > > private: > @@ -303,6 +313,8 @@ private: > > // One SSL for all connections. > static SSL_CTX* ctx; > + > + bro::Flare write_flare; > }; > > #include <zlib.h> > @@ -328,6 +340,8 @@ public: > > virtual bool Eof() { return io->Eof(); } > virtual int Fd() { return io->Fd(); } > + virtual std::vector<int> FdSupplements() const > + { return io->FdSupplements(); } > virtual void Stats(char* buffer, int length); > > void EnableCompression(int level) > diff --git a/src/DNS_Mgr.cc b/src/DNS_Mgr.cc > index 9188d61..9fb5c8b 100644 > --- a/src/DNS_Mgr.cc > +++ b/src/DNS_Mgr.cc > @@ -1217,9 +1217,10 @@ void DNS_Mgr::IssueAsyncRequests() > } > } > > -void DNS_Mgr::GetFds(int* read, int* write, int* except) > +void DNS_Mgr::GetFds(std::vector<int>* read, std::vector<int>* write, > + std::vector<int>* except) > { > - *read = nb_dns_fd(nb_dns); > + read->push_back(nb_dns_fd(nb_dns)); > } > > double DNS_Mgr::NextTimestamp(double* network_time) > diff --git a/src/DNS_Mgr.h b/src/DNS_Mgr.h > index 7864505..fa19914 100644 > --- a/src/DNS_Mgr.h > +++ b/src/DNS_Mgr.h > @@ -132,7 +132,8 @@ protected: > void DoProcess(bool flush); > > // IOSource interface. > - virtual void GetFds(int* read, int* write, int* except); > + virtual void GetFds(std::vector<int>* read, std::vector<int>* write, > + std::vector<int>* except); > virtual double NextTimestamp(double* network_time); > virtual void Process(); > virtual const char* Tag() { return "DNS_Mgr"; } > diff --git a/src/Flare.cc b/src/Flare.cc > new file mode 100644 > index 0000000..8a0418f > --- /dev/null > +++ b/src/Flare.cc > @@ -0,0 +1,29 @@ > +// See the file "COPYING" in the main distribution directory for copyright. > + > +#include "Flare.h" > +#include "util.h" > +#include <unistd.h> > +#include <fcntl.h> > +#include <errno.h> > + > +using namespace bro; > + > +Flare::Flare() > + : pipe(FD_CLOEXEC, FD_CLOEXEC, O_NONBLOCK, O_NONBLOCK) > + { > + } > + > +void Flare::Fire() > + { > + char tmp; > + safe_write(pipe.WriteFD(), &tmp, 1); > + } > + > +void Flare::Extinguish() > + { > + char tmp[256]; > + > + for ( ; ; ) > + if ( read(pipe.ReadFD(), &tmp, sizeof(tmp)) == -1 && errno == > EAGAIN ) > + break; > + } > diff --git a/src/Flare.h b/src/Flare.h > new file mode 100644 > index 0000000..4e63788 > --- /dev/null > +++ b/src/Flare.h > @@ -0,0 +1,45 @@ > +// See the file "COPYING" in the main distribution directory for copyright. > + > +#ifndef BRO_FLARE_H > +#define BRO_FLARE_H > + > +#include "Pipe.h" > + > +namespace bro { > + > +class Flare { > +public: > + > + /** > + * Create a flare object that can be used to signal a "ready" status via > + * a file descriptor that may be integrated with select(), poll(), etc. > + * Not thread-safe, but that should only require Fire()/Extinguish() > calls > + * to be made mutually exclusive (across all copies of a Flare). > + */ > + Flare(); > + > + /** > + * @return a file descriptor that will become ready if the flare has > been > + * Fire()'d and not yet Extinguished()'d. > + */ > + int FD() const > + { return pipe.ReadFD(); } > + > + /** > + * Put the object in the "ready" state. > + */ > + void Fire(); > + > + /** > + * Take the object out of the "ready" state. > + */ > + void Extinguish(); > + > +private: > + > + Pipe pipe; > +}; > + > +} // namespace bro > + > +#endif // BRO_FLARE_H > diff --git a/src/FlowSrc.cc b/src/FlowSrc.cc > index 8eed94f..4999d9c 100644 > --- a/src/FlowSrc.cc > +++ b/src/FlowSrc.cc > @@ -28,10 +28,11 @@ FlowSrc::~FlowSrc() > delete netflow_analyzer; > } > > -void FlowSrc::GetFds(int* read, int* write, int* except) > +void FlowSrc::GetFds(std::vector<int>* read, std::vector<int>* write, > + std::vector<int>* except) > { > if ( selectable_fd >= 0 ) > - *read = selectable_fd; > + read->push_back(selectable_fd); > } > > double FlowSrc::NextTimestamp(double* network_time) > diff --git a/src/FlowSrc.h b/src/FlowSrc.h > index 03dda27..ee92760 100644 > --- a/src/FlowSrc.h > +++ b/src/FlowSrc.h > @@ -34,7 +34,8 @@ public: > > // IOSource interface: > bool IsReady(); > - void GetFds(int* read, int* write, int* except); > + void GetFds(std::vector<int>* read, std::vector<int>* write, > + std::vector<int>* except); > double NextTimestamp(double* network_time); > void Process(); > > diff --git a/src/IOSource.cc b/src/IOSource.cc > index d47007c..540b797 100644 > --- a/src/IOSource.cc > +++ b/src/IOSource.cc > @@ -24,6 +24,15 @@ void IOSourceRegistry::RemoveAll() > dont_counts = sources.size(); > } > > +static void fd_vector_set(const std::vector<int>& fds, fd_set* set, int* max) > + { > + for ( size_t i = 0; i < fds.size(); ++i ) > + { > + FD_SET(fds[i], set); > + *max = ::max(fds[i], *max); > + } > + } > + > IOSource* IOSourceRegistry::FindSoonest(double* ts) > { > // Remove sources which have gone dry. For simplicity, we only > @@ -94,16 +103,14 @@ IOSource* IOSourceRegistry::FindSoonest(double* ts) > // be ready. > continue; > > - src->fd_read = src->fd_write = src->fd_except = 0; > + src->fd_read.clear(); > + src->fd_write.clear(); > + src->fd_except.clear(); > src->src->GetFds(&src->fd_read, &src->fd_write, > &src->fd_except); > > - FD_SET(src->fd_read, &fd_read); > - FD_SET(src->fd_write, &fd_write); > - FD_SET(src->fd_except, &fd_except); > - > - maxx = max(src->fd_read, maxx); > - maxx = max(src->fd_write, maxx); > - maxx = max(src->fd_except, maxx); > + fd_vector_set(src->fd_read, &fd_read, &maxx); > + fd_vector_set(src->fd_write, &fd_write, &maxx); > + fd_vector_set(src->fd_except, &fd_except, &maxx); > } > > // We can't block indefinitely even when all sources are dry: > @@ -143,9 +150,7 @@ IOSource* IOSourceRegistry::FindSoonest(double* ts) > if ( ! src->src->IsIdle() ) > continue; > > - if ( FD_ISSET(src->fd_read, &fd_read) || > - FD_ISSET(src->fd_write, &fd_write) || > - FD_ISSET(src->fd_except, &fd_except) ) > + if ( src->Ready(&fd_read, &fd_write, &fd_except) ) > { > double local_network_time = 0; > double ts = > src->src->NextTimestamp(&local_network_time); > @@ -174,3 +179,23 @@ void IOSourceRegistry::Register(IOSource* src, bool > dont_count) > ++dont_counts; > return sources.push_back(s); > } > + > +static bool fd_vector_ready(const std::vector<int>& fds, fd_set* set) > + { > + for ( size_t i = 0; i < fds.size(); ++i ) > + if ( FD_ISSET(fds[i], set) ) > + return true; > + > + return false; > + } > + > +bool IOSourceRegistry::Source::Ready(fd_set* read, fd_set* write, > + fd_set* except) const > + { > + if ( fd_vector_ready(fd_read, read) || > + fd_vector_ready(fd_write, write) || > + fd_vector_ready(fd_except, except) ) > + return true; > + > + return false; > + } > diff --git a/src/IOSource.h b/src/IOSource.h > index db50bbd..3da70af 100644 > --- a/src/IOSource.h > +++ b/src/IOSource.h > @@ -4,6 +4,8 @@ > #define iosource_h > > #include <list> > +#include <vector> > +#include <sys/select.h> > #include "Timer.h" > > using namespace std; > @@ -22,7 +24,8 @@ public: > > // Returns select'able fds (leaves args untouched if we don't have > // selectable fds). > - virtual void GetFds(int* read, int* write, int* except) = 0; > + virtual void GetFds(std::vector<int>* read, std::vector<int>* write, > + std::vector<int>* except) = 0; > > // The following two methods are only called when either IsIdle() > // returns false or select() on one of the fds indicates that there's > @@ -89,9 +92,11 @@ protected: > > struct Source { > IOSource* src; > - int fd_read; > - int fd_write; > - int fd_except; > + std::vector<int> fd_read; > + std::vector<int> fd_write; > + std::vector<int> fd_except; > + > + bool Ready(fd_set* read, fd_set* write, fd_set* except) const; > }; > > typedef list<Source*> SourceList; > diff --git a/src/Pipe.cc b/src/Pipe.cc > new file mode 100644 > index 0000000..51298d0 > --- /dev/null > +++ b/src/Pipe.cc > @@ -0,0 +1,79 @@ > +// See the file "COPYING" in the main distribution directory for copyright. > + > +#include "Pipe.h" > +#include "Reporter.h" > +#include <unistd.h> > +#include <fcntl.h> > +#include <errno.h> > +#include <cstdio> > + > +using namespace bro; > + > +static void pipe_fail(int eno) > + { > + char tmp[256]; > + strerror_r(eno, tmp, sizeof(tmp)); > + reporter->FatalError("Pipe failure: %s", tmp); > + } > + > +static void set_flags(int fd, int flags) > + { > + if ( flags ) > + fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | flags); > + } > + > +static void set_status_flags(int fd, int flags) > + { > + if ( flags ) > + fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | flags); > + } > + > +static int dup_or_fail(int fd, int flags) > + { > + int rval = dup(fd); > + > + if ( rval < 0 ) > + pipe_fail(errno); > + > + set_flags(fd, flags); > + return rval; > + } > + > +Pipe::Pipe(int flags0, int flags1, int status_flags0, int status_flags1) > + { > + // pipe2 can set flags atomically, but not yet available everywhere. > + if ( ::pipe(fds) ) > + pipe_fail(errno); > + > + flags[0] = flags0; > + flags[1] = flags1; > + > + set_flags(fds[0], flags[0]); > + set_flags(fds[1], flags[1]); > + set_status_flags(fds[0], status_flags0); > + set_status_flags(fds[1], status_flags1); > + } > + > +Pipe::~Pipe() > + { > + close(fds[0]); > + close(fds[1]); > + } > + > +Pipe::Pipe(const Pipe& other) > + { > + fds[0] = dup_or_fail(other.fds[0], other.flags[0]); > + fds[1] = dup_or_fail(other.fds[1], other.flags[1]); > + } > + > +Pipe& Pipe::operator=(const Pipe& other) > + { > + if ( this == &other ) > + return *this; > + > + close(fds[0]); > + close(fds[1]); > + fds[0] = dup_or_fail(other.fds[0], other.flags[0]); > + fds[1] = dup_or_fail(other.fds[1], other.flags[1]); > + return *this; > + } > diff --git a/src/Pipe.h b/src/Pipe.h > new file mode 100644 > index 0000000..493169e > --- /dev/null > +++ b/src/Pipe.h > @@ -0,0 +1,57 @@ > +// See the file "COPYING" in the main distribution directory for copyright. > + > +#ifndef BRO_PIPE_H > +#define BRO_PIPE_H > + > +namespace bro { > + > +class Pipe { > +public: > + > + /** > + * Create a pair of file descriptors via pipe(), or aborts if it cannot. > + * @param flags0 file descriptor flags to set on read end of pipe. > + * @param flags1 file descriptor flags to set on write end of pipe. > + * @param status_flags0 descriptor status flags to set on read end of > pipe. > + * @param status_flags1 descriptor status flags to set on write end of > pipe. > + */ > + Pipe(int flags0 = 0, int flags1 = 0, int status_flags0 = 0, > + int status_flags1 = 0); > + > + /** > + * Close the pair of file descriptors owned by the object. > + */ > + ~Pipe(); > + > + /** > + * Make a copy of another Pipe object (file descriptors are dup'd). > + */ > + Pipe(const Pipe& other); > + > + /** > + * Assign a Pipe object by closing file descriptors and duping those of > + * the other. > + */ > + Pipe& operator=(const Pipe& other); > + > + /** > + * @return the file descriptor associated with the read-end of the pipe. > + */ > + int ReadFD() const > + { return fds[0]; } > + > + /** > + * @return the file descriptor associated with the write-end of the > pipe. > + */ > + int WriteFD() const > + { return fds[1]; } > + > +private: > + > + int fds[2]; > + int flags[2]; > +}; > + > +} // namespace bro > + > +#endif // BRO_PIPE_H > diff --git a/src/PktSrc.cc b/src/PktSrc.cc > index b5ac3a5..04b7b7d 100644 > --- a/src/PktSrc.cc > +++ b/src/PktSrc.cc > @@ -51,7 +51,8 @@ PktSrc::~PktSrc() > delete [] readfile; > } > > -void PktSrc::GetFds(int* read, int* write, int* except) > +void PktSrc::GetFds(std::vector<int>* read, std::vector<int>* write, > + std::vector<int>* except) > { > if ( pseudo_realtime ) > { > @@ -62,7 +63,7 @@ void PktSrc::GetFds(int* read, int* write, int* except) > } > > if ( selectable_fd >= 0 ) > - *read = selectable_fd; > + read->push_back(selectable_fd); > } > > int PktSrc::ExtractNextPacket() > diff --git a/src/PktSrc.h b/src/PktSrc.h > index 70eef4d..0d4be12 100644 > --- a/src/PktSrc.h > +++ b/src/PktSrc.h > @@ -98,7 +98,8 @@ public: > > // IOSource interface > bool IsReady(); > - void GetFds(int* read, int* write, int* except); > + void GetFds(std::vector<int>* read, std::vector<int>* write, > + std::vector<int>* except); > double NextTimestamp(double* local_network_time); > void Process(); > const char* Tag() { return "PktSrc"; } > diff --git a/src/RemoteSerializer.cc b/src/RemoteSerializer.cc > index 3e46c5a..34c5f1a 100644 > --- a/src/RemoteSerializer.cc > +++ b/src/RemoteSerializer.cc > @@ -1368,12 +1368,17 @@ void RemoteSerializer::Unregister(ID* id) > } > } > > -void RemoteSerializer::GetFds(int* read, int* write, int* except) > +void RemoteSerializer::GetFds(std::vector<int>* read, std::vector<int>* > write, > + std::vector<int>* except) > { > - *read = io->Fd(); > + read->push_back(io->Fd()); > + std::vector<int> supp = io->FdSupplements(); > + > + for ( size_t i = 0; i < supp.size(); ++i ) > + read->push_back(supp[i]); > > if ( io->CanWrite() ) > - *write = io->Fd(); > + write->push_back(io->Fd()); > } > > double RemoteSerializer::NextTimestamp(double* local_network_time) > @@ -3356,6 +3361,15 @@ SocketComm::~SocketComm() > > static unsigned int first_rtime = 0; > > +static void fd_vector_set(const std::vector<int>& fds, fd_set* set, int* max) > + { > + for ( size_t i = 0; i < fds.size(); ++i ) > + { > + FD_SET(fds[i], set); > + *max = ::max(fds[i], *max); > + } > + } > + > void SocketComm::Run() > { > first_rtime = (unsigned int) current_time(true); > @@ -3381,6 +3395,7 @@ void SocketComm::Run() > > FD_SET(io->Fd(), &fd_read); > max_fd = io->Fd(); > + fd_vector_set(io->FdSupplements(), &fd_read, &max_fd); > > loop_over_list(peers, i) > { > @@ -3389,6 +3404,7 @@ void SocketComm::Run() > FD_SET(peers[i]->io->Fd(), &fd_read); > if ( peers[i]->io->Fd() > max_fd ) > max_fd = peers[i]->io->Fd(); > + fd_vector_set(peers[i]->io->FdSupplements(), > &fd_read, &max_fd); > } > else > { > @@ -3439,38 +3455,17 @@ void SocketComm::Run() > if ( ! io->IsFillingUp() && shutting_conns_down ) > shutting_conns_down = false; > > - // We cannot rely solely on select() as the there may > - // be some data left in our input/output queues. So, we use > - // a small timeout for select and check for data > - // manually afterwards. > - > static long selects = 0; > static long canwrites = 0; > - static long timeouts = 0; > > ++selects; > if ( io->CanWrite() ) > ++canwrites; > > - // FIXME: Fine-tune this (timeouts, flush, etc.) > - struct timeval small_timeout; > - small_timeout.tv_sec = 0; > - small_timeout.tv_usec = > - io->CanWrite() || io->CanRead() ? 1 : 10; > - > -#if 0 > - if ( ! io->CanWrite() ) > - usleep(10); > -#endif > - > - int a = select(max_fd + 1, &fd_read, &fd_write, &fd_except, > - &small_timeout); > - > - if ( a == 0 ) > - ++timeouts; > + int a = select(max_fd + 1, &fd_read, &fd_write, &fd_except, 0); > > if ( selects % 100000 == 0 ) > - Log(fmt("selects=%ld canwrites=%ld timeouts=%ld", > selects, canwrites, timeouts)); > + Log(fmt("selects=%ld canwrites=%ld", selects, > canwrites)); > > if ( a < 0 ) > // Ignore errors for now. > diff --git a/src/RemoteSerializer.h b/src/RemoteSerializer.h > index 9dbfbd9..3aa4f91 100644 > --- a/src/RemoteSerializer.h > +++ b/src/RemoteSerializer.h > @@ -140,7 +140,8 @@ public: > void Finish(); > > // Overidden from IOSource: > - virtual void GetFds(int* read, int* write, int* except); > + virtual void GetFds(std::vector<int>* read, std::vector<int>* write, > + std::vector<int>* except); > virtual double NextTimestamp(double* local_network_time); > virtual void Process(); > virtual TimerMgr::Tag* GetCurrentTag(); > diff --git a/src/Serializer.cc b/src/Serializer.cc > index 36b1c74..0ea79cf 100644 > --- a/src/Serializer.cc > +++ b/src/Serializer.cc > @@ -1067,9 +1067,10 @@ void EventPlayer::GotFunctionCall(const char* name, > double time, > // We don't replay function calls. > } > > -void EventPlayer::GetFds(int* read, int* write, int* except) > +void EventPlayer::GetFds(std::vector<int>* read, std::vector<int>* write, > + std::vector<int>* except) > { > - *read = fd; > + read->push_back(fd); > } > > double EventPlayer::NextTimestamp(double* local_network_time) > diff --git a/src/Serializer.h b/src/Serializer.h > index 543797a..0524906 100644 > --- a/src/Serializer.h > +++ b/src/Serializer.h > @@ -355,7 +355,8 @@ public: > EventPlayer(const char* file); > virtual ~EventPlayer(); > > - virtual void GetFds(int* read, int* write, int* except); > + virtual void GetFds(std::vector<int>* read, std::vector<int>* write, > + std::vector<int>* except); > virtual double NextTimestamp(double* local_network_time); > virtual void Process(); > virtual const char* Tag() { return "EventPlayer"; } > diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc > index 4491cd4..c16b9f4 100644 > --- a/src/threading/Manager.cc > +++ b/src/threading/Manager.cc > @@ -65,7 +65,8 @@ void Manager::AddMsgThread(MsgThread* thread) > msg_threads.push_back(thread); > } > > -void Manager::GetFds(int* read, int* write, int* except) > +void Manager::GetFds(std::vector<int>* read, std::vector<int>* write, > + std::vector<int>* except) > { > } > > diff --git a/src/threading/Manager.h b/src/threading/Manager.h > index e839749..4f0e539 100644 > --- a/src/threading/Manager.h > +++ b/src/threading/Manager.h > @@ -103,7 +103,8 @@ protected: > /** > * Part of the IOSource interface. > */ > - virtual void GetFds(int* read, int* write, int* except); > + virtual void GetFds(std::vector<int>* read, std::vector<int>* write, > + std::vector<int>* except); > > /** > * Part of the IOSource interface. > > _______________________________________________ > bro-commits mailing list > [email protected] > http://mailman.icsi.berkeley.edu/mailman/listinfo/bro-commits > _______________________________________________ bro-dev mailing list [email protected] http://mailman.icsi.berkeley.edu/mailman/listinfo/bro-dev
