Revision: 6501
          http://playerstage.svn.sourceforge.net/playerstage/?rev=6501&view=rev
Author:   thjc
Date:     2008-06-09 22:16:08 -0700 (Mon, 09 Jun 2008)

Log Message:
-----------
Added a file watcher class which allows drivers to delegate watching a file to 
the server. Removes the need for non blocking reads followed by a sleep (the 
non blocking read is still a good idea, but can now user Wait() instead of the 
sleep

Modified Paths:
--------------
    code/player/branches/release-2-1-patches/libplayercore/Makefile.am
    code/player/branches/release-2-1-patches/libplayercore/driver.cc
    code/player/branches/release-2-1-patches/libplayercore/driver.h
    code/player/branches/release-2-1-patches/libplayercore/globals.cc
    code/player/branches/release-2-1-patches/libplayercore/globals.h
    code/player/branches/release-2-1-patches/libplayercore/playercore.h
    code/player/branches/release-2-1-patches/libplayertcp/playertcp.cc
    code/player/branches/release-2-1-patches/libplayertcp/playerudp.cc
    code/player/branches/release-2-1-patches/server/drivers/opaque/tcpstream.cc
    code/player/branches/release-2-1-patches/server/server.cc

Added Paths:
-----------
    code/player/branches/release-2-1-patches/libplayercore/filewatcher.cc
    code/player/branches/release-2-1-patches/libplayercore/filewatcher.h

Modified: code/player/branches/release-2-1-patches/libplayercore/Makefile.am
===================================================================
--- code/player/branches/release-2-1-patches/libplayercore/Makefile.am  
2008-06-10 04:38:58 UTC (rev 6500)
+++ code/player/branches/release-2-1-patches/libplayercore/Makefile.am  
2008-06-10 05:16:08 UTC (rev 6501)
@@ -21,6 +21,7 @@
                           configfile.cc configfile.h \
                           playercommon.h player.h playerconfig.h.in \
                           message.h message.cc \
+                          filewatcher.cc filewatcher.h \
                           wallclocktime.cc wallclocktime.h playertime.h \
                           plugins.cc plugins.h \
                           globals.cc globals.h \
@@ -43,6 +44,7 @@
                       devicetable.h \
                       driver.h \
                       drivertable.h \
+                      filewatcher.h \
                       globals.h \
                       interface_util.h \
                       message.h \

Modified: code/player/branches/release-2-1-patches/libplayercore/driver.cc
===================================================================
--- code/player/branches/release-2-1-patches/libplayercore/driver.cc    
2008-06-10 04:38:58 UTC (rev 6500)
+++ code/player/branches/release-2-1-patches/libplayercore/driver.cc    
2008-06-10 05:16:08 UTC (rev 6501)
@@ -63,6 +63,7 @@
 #include <libplayercore/devicetable.h>
 #include <libplayercore/configfile.h>
 #include <libplayercore/globals.h>
+#include <libplayercore/filewatcher.h>
 #include <libplayercore/property.h>
 #include <libplayercore/interface_util.h>
 
@@ -228,10 +229,10 @@
 }
 
 void
-Driver::Publish(player_devaddr_t addr, 
-                uint8_t type, 
+Driver::Publish(player_devaddr_t addr,
+                uint8_t type,
                 uint8_t subtype,
-                void* src, 
+                void* src,
                 size_t deprecated,
                 double* timestamp,
                 bool copy)
@@ -304,6 +305,19 @@
   return( shutdownResult );
 }
 
+/** @brief Wake up the driver if the specified event occurs on the file 
descriptor */
+int Driver::AddFileWatch(int fd, bool ReadWatch , bool WriteWatch , bool 
ExceptWatch )
+{
+  return 
fileWatcher->AddFileWatch(fd,InQueue,ReadWatch,WriteWatch,ExceptWatch);
+}
+
+/** @brief Remove a previously added watch, call with the same arguments as 
when adding the watch */
+int Driver::RemoveFileWatch(int fd, bool ReadWatch , bool WriteWatch , bool 
ExceptWatch )
+{
+  return 
fileWatcher->RemoveFileWatch(fd,InQueue,ReadWatch,WriteWatch,ExceptWatch);
+}
+
+
 /* start a thread that will invoke Main() */
 void
 Driver::StartThread(void)

Modified: code/player/branches/release-2-1-patches/libplayercore/driver.h
===================================================================
--- code/player/branches/release-2-1-patches/libplayercore/driver.h     
2008-06-10 04:38:58 UTC (rev 6500)
+++ code/player/branches/release-2-1-patches/libplayercore/driver.h     
2008-06-10 05:16:08 UTC (rev 6501)
@@ -146,7 +146,7 @@
     @returns 0 on success, non-zero otherwise. */
     int AddInterface(player_devaddr_t *addr, ConfigFile * cf, int section, int 
code, char * key = NULL);
 
-    
+
     /** @brief Set/reset error code */
     void SetError(int code) {this->error = code;}
 
@@ -154,13 +154,20 @@
 
     Call this method to block until a new message arrives on
     Driver::InQueue.  This method will return true immediately if at least
-    one message is already waiting. 
-    
+    one message is already waiting.
+
     If TimeOut is set to a positive value this method will return false if the
     timeout occurs before and update is recieved.
     */
     bool Wait(double TimeOut=0.0) { return this->InQueue->Wait(); }
 
+    /** @brief Wake up the driver if the specified event occurs on the file 
descriptor */
+    int AddFileWatch(int fd, bool ReadWatch = true, bool WriteWatch = false, 
bool ExceptWatch = true);
+
+    /** @brief Remove a previously added watch, call with the same arguments 
as when adding the watch */
+    int RemoveFileWatch(int fd, bool ReadWatch = true, bool WriteWatch = 
false, bool ExceptWatch = true);
+
+
   public:
     /** @brief The driver's thread.
 
@@ -203,10 +210,10 @@
                  bool copy=true);
 
      /** @brief Publish a message via one of this driver's interfaces.
-     
+
      This form of Publish will assemble the message header for you.
      The message is broadcast to all interested parties
-     
+
      @param addr The origin address
      @param type The message type
      @param subtype The message subtype
@@ -215,23 +222,23 @@
      @param timestamp Timestamp for the message body (if NULL, then the
      current time will be filled in)
      @param copy if set to false the data will be claimed and the caller 
should no longer use or free it */
-     virtual void Publish(player_devaddr_t addr, 
-                  uint8_t type, 
+     virtual void Publish(player_devaddr_t addr,
+                  uint8_t type,
                   uint8_t subtype,
-                  void* src=NULL, 
+                  void* src=NULL,
                   size_t deprecated=0,
                   double* timestamp=NULL,
                   bool copy=true);
- 
- 
 
+
+
     /** @brief Publish a message via one of this driver's interfaces.
 
     Use this form of Publish if you already have the message header
     assembled and have a target queue to send to.
     @param queue the target queue.
     @param hdr The message header
-    @param src The message body 
+    @param src The message body
     @param copy if set to false the data will be claimed and the caller should 
no longer use or free it */
     virtual void Publish(QueuePointer &queue,
                  player_msghdr_t* hdr,
@@ -243,8 +250,8 @@
     Use this form of Publish if you already have the message header
     assembled and wish to broadcast the message to all subscribed parties.
     @param hdr The message header
-    @param src The message body 
-    @param copy if set to false the data will be claimed and the caller should 
no longer use or free it */ 
+    @param src The message body
+    @param copy if set to false the data will be claimed and the caller should 
no longer use or free it */
     virtual void Publish(player_msghdr_t* hdr,
                  void* src,
                  bool copy = true);
@@ -321,15 +328,15 @@
     subscriptions to the driver; a driver MAY override them, but
     usually won't. This alternative form includes the clients queue
     so you can map future requests and unsubscriptions to a specific queue.
-    
+
     If this methods returns a value other than 1 then the other form of 
subscribe wont be called
 
     @param queue The queue of the subscribing client
     @param addr Address of the device to subscribe to (the driver may
     have more than one interface).
     @returns Returns 0 on success, -ve on error and 1 for unimplemented. */
-    virtual int Subscribe(QueuePointer &queue, player_devaddr_t addr) {return 
1;};    
-    
+    virtual int Subscribe(QueuePointer &queue, player_devaddr_t addr) {return 
1;};
+
     /** @brief Unsubscribe from this driver.
 
     The Subscribe() and Unsubscribe() methods are used to control
@@ -347,7 +354,7 @@
     subscriptions to the driver; a driver MAY override them, but
     usually won't.This alternative form includes the clients queue
     so you can map future requests and unsubscriptions to a specific queue.
-    
+
     If this methods returns a value other than 1 then the other form of 
subscribe wont be called
 
     @param queue The queue of the subscribing client
@@ -355,7 +362,7 @@
     have more than one interface).
     @returns Returns 0 on success. */
     virtual int Unsubscribe(QueuePointer &queue, player_devaddr_t addr) 
{return 1;};
-    
+
     /** @brief Initialize the driver.
 
     This function is called with the first client subscribes; it MUST
@@ -447,7 +454,6 @@
     @param section Configuration file section that may define the property 
value
     @return True if the property was registered, false otherwise */
     virtual bool RegisterProperty(Property *prop, ConfigFile* cf, int section);
-
 };
 
 

Added: code/player/branches/release-2-1-patches/libplayercore/filewatcher.cc
===================================================================
--- code/player/branches/release-2-1-patches/libplayercore/filewatcher.cc       
                        (rev 0)
+++ code/player/branches/release-2-1-patches/libplayercore/filewatcher.cc       
2008-06-10 05:16:08 UTC (rev 6501)
@@ -0,0 +1,188 @@
+/*
+ * FileWatcher.cc
+ *
+ *  Created on: 10/06/2008
+ *      Author: tcollett
+ */
+
+#include <libplayercore/filewatcher.h>
+#include <libplayercore/message.h>
+#include <libplayercore/error.h>
+#include <sys/time.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <error.h>
+#include <math.h>
+#include <string.h>
+
+FileWatcher::FileWatcher()
+{
+       WatchedFilesArraySize = INITIAL_WATCHED_FILES_ARRAY_SIZE;
+       WatchedFilesArrayCount = 0;
+       WatchedFiles = reinterpret_cast<struct fd_driver_pair *> 
(calloc(WatchedFilesArraySize,sizeof(WatchedFiles[0])));
+       assert(WatchedFiles);
+       pthread_mutex_init(&this->lock,NULL);
+
+}
+
+FileWatcher::~FileWatcher()
+{
+       free(WatchedFiles);
+}
+
+void FileWatcher::Lock()
+{
+  pthread_mutex_lock(&lock);
+}
+
+void FileWatcher::Unlock()
+{
+  pthread_mutex_unlock(&lock);
+}
+
+
+int FileWatcher::Wait(double Timeout)
+{
+       Lock();
+       if (WatchedFilesArrayCount == 0)
+       {
+               Unlock();
+               return 0;
+       }
+
+       // intialise our FD sets for the select call
+       fd_set ReadFds,WriteFds,ExceptFds;
+       FD_ZERO(&ReadFds);
+       FD_ZERO(&WriteFds);
+       FD_ZERO(&ExceptFds);
+
+       int maxfd = 0;
+
+       for (unsigned int ii = 0; ii < WatchedFilesArrayCount; ++ii)
+       {
+               if (WatchedFiles[ii].fd >= 0)
+               {
+                       if (WatchedFiles[ii].fd > maxfd)
+                               maxfd = WatchedFiles[ii].fd;
+                       if (WatchedFiles[ii].Read)
+                               FD_SET(WatchedFiles[ii].fd,&ReadFds);
+                       if (WatchedFiles[ii].Write)
+                               FD_SET(WatchedFiles[ii].fd,&WriteFds);
+                       if (WatchedFiles[ii].Except)
+                               FD_SET(WatchedFiles[ii].fd,&ExceptFds);
+               }
+       }
+
+       struct timeval t;
+       t.tv_sec = static_cast<int> (floor(Timeout));
+       t.tv_usec = static_cast<int> ((Timeout - static_cast<int> 
(floor(Timeout))) * 1e6);
+       Unlock();
+       int ret = select (maxfd+1,&ReadFds,&WriteFds,&ExceptFds,&t);
+
+       if (ret < 0)
+       {
+               PLAYER_ERROR2("Select called failed in File Watcher: %d 
%s",errno,strerror(errno));
+               return ret;
+       }
+
+       int queueless_count = 0;
+
+       Lock();
+       for (unsigned int ii = 0; ii < WatchedFilesArrayCount && 
static_cast<int> (ii) < maxfd; ++ii)
+       {
+               int fd = WatchedFiles[ii].fd;
+               QueuePointer &q = WatchedFiles[ii].queue;
+               if (fd > 0 && fd < maxfd)
+               {
+                       if ((WatchedFiles[ii].Read && FD_ISSET(fd,&ReadFds)) ||
+                                       (WatchedFiles[ii].Write && 
FD_ISSET(fd,&WriteFds)) ||
+                                       (WatchedFiles[ii].Except && 
FD_ISSET(fd,&ExceptFds)))
+                       {
+                               if (q != NULL)
+                                       q->DataAvailable();
+                               else
+                                       queueless_count++;
+
+                       }
+               }
+       }
+       Unlock();
+
+       return queueless_count;
+}
+
+int FileWatcher::AddFileWatch(int fd, bool WatchRead, bool WatchWrite, bool 
WatchExcept)
+{
+       QueuePointer q;
+       return AddFileWatch(fd, q,WatchRead,WatchWrite,WatchExcept);
+}
+
+
+int FileWatcher::AddFileWatch(int fd, QueuePointer & queue, bool WatchRead, 
bool WatchWrite, bool WatchExcept)
+{
+       Lock();
+       // find the first available file descriptor
+       struct fd_driver_pair *next_entry = NULL;
+       if (WatchedFilesArrayCount < WatchedFilesArraySize)
+       {
+               next_entry = &WatchedFiles[WatchedFilesArrayCount];
+               WatchedFilesArrayCount++;
+       }
+       else
+       {
+               // first see if there is an empty spot in the list
+               for (unsigned int ii = 0; ii < WatchedFilesArrayCount; ++ii)
+               {
+                       if (WatchedFiles[ii].fd < 0)
+                       {
+                               next_entry = &WatchedFiles[ii];
+                               break;
+                       }
+               }
+               if (next_entry == NULL)
+               {
+                       // otherwise we allocate some more room for the array
+                       WatchedFilesArraySize*=2;
+                       WatchedFiles = reinterpret_cast<struct fd_driver_pair 
*> (realloc(WatchedFiles,sizeof(WatchedFiles[0])*WatchedFilesArraySize));
+                       next_entry = &WatchedFiles[WatchedFilesArrayCount];
+
+               }
+       }
+
+       next_entry->fd = fd;
+       next_entry->queue = queue;
+       next_entry->Read = WatchRead;
+       next_entry->Write = WatchWrite;
+       next_entry->Except = WatchExcept;
+       Unlock();
+       return 0;
+}
+
+int FileWatcher::RemoveFileWatch(int fd, bool WatchRead, bool WatchWrite, bool 
WatchExcept)
+{
+       QueuePointer q;
+       return RemoveFileWatch(fd, q,WatchRead,WatchWrite,WatchExcept);
+}
+
+
+int FileWatcher::RemoveFileWatch(int fd, QueuePointer &queue, bool WatchRead, 
bool WatchWrite, bool WatchExcept)
+{
+       Lock();
+       // this finds the first matching entry and removes it. It only removes 
one entry so call remove for every add
+       for (unsigned int ii = 0; ii < WatchedFilesArrayCount; ++ii)
+       {
+               if (WatchedFiles[ii].fd == fd &&
+                               WatchedFiles[ii].queue == queue &&
+                               WatchedFiles[ii].Read == WatchRead &&
+                               WatchedFiles[ii].Write == WatchWrite &&
+                               WatchedFiles[ii].Except == WatchExcept)
+               {
+                       WatchedFiles[ii].fd = -1;
+                       Unlock();
+                       return 0;
+               }
+       }
+       Unlock();
+       return -1;
+}
+


Property changes on: 
code/player/branches/release-2-1-patches/libplayercore/filewatcher.cc
___________________________________________________________________
Name: svn:mime-type
   + text/plain

Added: code/player/branches/release-2-1-patches/libplayercore/filewatcher.h
===================================================================
--- code/player/branches/release-2-1-patches/libplayercore/filewatcher.h        
                        (rev 0)
+++ code/player/branches/release-2-1-patches/libplayercore/filewatcher.h        
2008-06-10 05:16:08 UTC (rev 6501)
@@ -0,0 +1,56 @@
+/*
+ * filewatcher.h
+ *
+ *  Created on: 10/06/2008
+ *      Author: tcollett
+ */
+
+#ifndef FILEWATCHER_H_
+#define FILEWATCHER_H_
+
+#include <sys/select.h>
+#include <sys/types.h>
+#include <stdlib.h>
+#include <pthread.h>
+#include <libplayercore/message.h>
+
+class MessageQueue;
+
+struct fd_driver_pair
+{
+       int fd;
+       QueuePointer queue;
+       bool Read;
+       bool Write;
+       bool Except;
+};
+
+const size_t INITIAL_WATCHED_FILES_ARRAY_SIZE = 32;
+
+class FileWatcher
+{
+public:
+       FileWatcher();
+       virtual ~FileWatcher();
+
+       int Wait(double Timeout = 0);
+       int AddFileWatch(int fd, QueuePointer & queue, bool WatchRead = true, 
bool WatchWrite = false, bool WatchExcept = true);
+       int RemoveFileWatch(int fd, QueuePointer & queue, bool WatchRead = 
true, bool WatchWrite = false, bool WatchExcept = true);
+       int AddFileWatch(int fd, bool WatchRead = true, bool WatchWrite = 
false, bool WatchExcept = true);
+       int RemoveFileWatch(int fd, bool WatchRead = true, bool WatchWrite = 
false, bool WatchExcept = true);
+
+private:
+       struct fd_driver_pair * WatchedFiles;
+       size_t WatchedFilesArraySize;
+       size_t WatchedFilesArrayCount;
+
+    /** @brief Lock access to watcher internals. */
+    virtual void Lock(void);
+    /** @brief Unlock access to watcher internals. */
+    virtual void Unlock(void);
+    /// Used to lock access to Data.
+    pthread_mutex_t lock;
+
+};
+
+#endif /* FILEWATCHER_H_ */


Property changes on: 
code/player/branches/release-2-1-patches/libplayercore/filewatcher.h
___________________________________________________________________
Name: svn:mime-type
   + text/plain

Modified: code/player/branches/release-2-1-patches/libplayercore/globals.cc
===================================================================
--- code/player/branches/release-2-1-patches/libplayercore/globals.cc   
2008-06-10 04:38:58 UTC (rev 6500)
+++ code/player/branches/release-2-1-patches/libplayercore/globals.cc   
2008-06-10 05:16:08 UTC (rev 6501)
@@ -49,6 +49,7 @@
 
 #include <libplayercore/devicetable.h>
 #include <libplayercore/drivertable.h>
+#include <libplayercore/filewatcher.h>
 #include <libplayercore/playertime.h>
 #include <libplayercore/wallclocktime.h>
 
@@ -62,11 +63,14 @@
 // this table holds all the currently *available* drivers
 DriverTable* driverTable;
 
-// the global PlayerTime object has a method 
+// the global PlayerTime object has a method
 //   int GetTime(struct timeval*)
 // which everyone must use to get the current time
 PlayerTime* GlobalTime;
 
+// global class for watching for changes in files and sockets
+FileWatcher* fileWatcher;
+
 char playerversion[32];
 
 bool player_quit;
@@ -86,6 +90,7 @@
   deviceTable = new DeviceTable();
   driverTable = new DriverTable();
   GlobalTime = new WallclockTime();
+  fileWatcher = new FileWatcher();
   strncpy(playerversion, VERSION, sizeof(playerversion));
   player_quit = false;
   player_quiet_startup = false;
@@ -97,12 +102,10 @@
 void
 player_globals_fini()
 {
-  if(deviceTable)
-    delete deviceTable;
-  if(driverTable)
-    delete driverTable;
-  if(GlobalTime)
-    delete GlobalTime;
+  delete deviceTable;
+  delete driverTable;
+  delete GlobalTime;
+  delete fileWatcher;
 #if HAVE_PLAYERSD
   if(globalSD)
     player_sd_fini(globalSD);

Modified: code/player/branches/release-2-1-patches/libplayercore/globals.h
===================================================================
--- code/player/branches/release-2-1-patches/libplayercore/globals.h    
2008-06-10 04:38:58 UTC (rev 6500)
+++ code/player/branches/release-2-1-patches/libplayercore/globals.h    
2008-06-10 05:16:08 UTC (rev 6501)
@@ -42,11 +42,13 @@
 class DeviceTable;
 class PlayerTime;
 class DriverTable;
+class FileWatcher;
 struct player_sd;
 
 extern DeviceTable* deviceTable;
 extern PlayerTime* GlobalTime;
 extern DriverTable* driverTable;
+extern FileWatcher* fileWatcher;
 extern char playerversion[];
 extern bool player_quit;
 extern bool player_quiet_startup;

Modified: code/player/branches/release-2-1-patches/libplayercore/playercore.h
===================================================================
--- code/player/branches/release-2-1-patches/libplayercore/playercore.h 
2008-06-10 04:38:58 UTC (rev 6500)
+++ code/player/branches/release-2-1-patches/libplayercore/playercore.h 
2008-06-10 05:16:08 UTC (rev 6501)
@@ -46,6 +46,7 @@
 #include <libplayercore/driver.h>
 #include <libplayercore/drivertable.h>
 #include <libplayercore/error.h>
+#include <libplayercore/filewatcher.h>
 #include <libplayercore/globals.h>
 #include <libplayercore/interface_util.h>
 #include <libplayercore/message.h>

Modified: code/player/branches/release-2-1-patches/libplayertcp/playertcp.cc
===================================================================
--- code/player/branches/release-2-1-patches/libplayertcp/playertcp.cc  
2008-06-10 04:38:58 UTC (rev 6500)
+++ code/player/branches/release-2-1-patches/libplayertcp/playertcp.cc  
2008-06-10 05:16:08 UTC (rev 6501)
@@ -189,6 +189,9 @@
     // set up for later use of poll() to accept() connections on this port
     this->listen_ufds[i].fd = this->listeners[i].fd;
     this->listen_ufds[i].events = POLLIN;
+
+    // set up for later use by global file watcher
+    fileWatcher->AddFileWatch(this->listeners[i].fd);
   }
 
   return(0);
@@ -248,6 +251,10 @@
   this->client_ufds[j].fd = this->clients[j].fd;
   this->client_ufds[j].events = POLLIN;
 
+  // set up for later use by global file watcher
+  fileWatcher->AddFileWatch(this->client_ufds[j].fd);
+
+
   // Create an outgoing queue for this client
   this->clients[j].queue =
           QueuePointer(true,PLAYER_MSGQUEUE_DEFAULT_MAXLEN);
@@ -390,6 +397,8 @@
   free(this->clients[cli].dev_subs);
   if(close(this->clients[cli].fd) < 0)
     PLAYER_WARN1("close() failed: %s", strerror(errno));
+  fileWatcher->RemoveFileWatch(this->clients[cli].fd);
+
   this->clients[cli].fd = -1;
   this->clients[cli].valid = 0;
   this->clients[cli].queue = QueuePointer();
@@ -746,8 +755,8 @@
     int ret = pthread_mutex_trylock(&clients_mutex);
     assert (ret == EBUSY);
   }
-       
-       
+
+
   if(!have_lock)
     Lock();
 
@@ -1328,13 +1337,13 @@
 }
 
 
-void 
+void
 PlayerTCP::Lock()
 {
   pthread_mutex_lock(&clients_mutex);
 }
 
-void 
+void
 PlayerTCP::Unlock()
 {
   pthread_mutex_unlock(&clients_mutex);

Modified: code/player/branches/release-2-1-patches/libplayertcp/playerudp.cc
===================================================================
--- code/player/branches/release-2-1-patches/libplayertcp/playerudp.cc  
2008-06-10 04:38:58 UTC (rev 6500)
+++ code/player/branches/release-2-1-patches/libplayertcp/playerudp.cc  
2008-06-10 05:16:08 UTC (rev 6501)
@@ -176,6 +176,10 @@
     // set up for later use of poll() to accept() connections on this port
     this->listen_ufds[i].fd = this->listeners[i].fd;
     this->listen_ufds[i].events = POLLIN;
+
+    // set up for later use by global file watcher
+    fileWatcher->AddFileWatch(this->listeners[i].fd);
+
   }
 
   return(0);
@@ -271,6 +275,7 @@
     }
   }
   free(this->clients[cli].dev_subs);
+  fileWatcher->RemoveFileWatch(this->clients[cli].fd);
   this->clients[cli].fd = -1;
   this->clients[cli].valid = 0;
   // FIXME
@@ -809,7 +814,7 @@
           // update the message size and send it off
           hdr.size = decode_msglen;
           void * msg_data = hdr.size? this->decode_readbuffer: NULL;
-          
+
           if(hdr.addr.interf == PLAYER_PLAYER_CODE)
           {
             Message* msg = new Message(hdr, msg_data, client->queue);

Modified: 
code/player/branches/release-2-1-patches/server/drivers/opaque/tcpstream.cc
===================================================================
--- code/player/branches/release-2-1-patches/server/drivers/opaque/tcpstream.cc 
2008-06-10 04:38:58 UTC (rev 6500)
+++ code/player/branches/release-2-1-patches/server/drivers/opaque/tcpstream.cc 
2008-06-10 05:16:08 UTC (rev 6501)
@@ -21,7 +21,7 @@
 /** @ingroup drivers Drivers */
 /** @{ */
 /*
- * 
+ *
 The tcpstream driver is based on the serialstream driver. It reads from a 
socket
 continuously and publishes the data. Currently this is usable with the 
SickS3000 driver
 and the Nav200 driver. This driver does no interpretation of data output, 
merely reading
@@ -44,7 +44,7 @@
 
 - PLAYER_LASER_REQ_GET_GEOM
 - PLAYER_LASER_REQ_GET_CONFIG
-  
+
 @par Configuration file options
 
 - ip (string)
@@ -58,9 +58,9 @@
 - buffer_size (integer)
   - The size of the buffer to be used when reading, this is the maximum that 
can be read in one read command
   - Default 4096
-      
[EMAIL PROTECTED] Example 
 
[EMAIL PROTECTED] Example
+
 @verbatim
 driver
 (
@@ -79,12 +79,12 @@
 
 @endverbatim
 
[EMAIL PROTECTED] David Olsen, Toby Collett, Inro Technologies 
[EMAIL PROTECTED] David Olsen, Toby Collett, Inro Technologies
 
 */
 /** @} */
-  
 
+
 // ONLY if you need something that was #define'd as a result of configure
 // (e.g., HAVE_CFMAKERAW), then #include <config.h>, like so:
 /*
@@ -116,22 +116,10 @@
 #include <libplayercore/playercore.h>
 
 #define DEFAULT_TCP_OPAQUE_BUFFER_SIZE 4096
-#define DEFAULT_TCP_OPAQUE_IP "10.99.10.6"
+#define DEFAULT_TCP_OPAQUE_IP "127.0.0.1"
 #define DEFAULT_TCP_OPAQUE_PORT 4002
 
 
////////////////////////////////////////////////////////////////////////////////
-// Device codes
-
-#define STX     0x02
-#define ACK     0xA0
-#define NACK    0x92
-#define CRC16_GEN_POL 0x8005
-
-////////////////////////////////////////////////////////////////////////////////
-// Error macros
-#define RETURN_ERROR(erc, m) {PLAYER_ERROR(m); return erc;}
-
-////////////////////////////////////////////////////////////////////////////////
 // The class for the driver
 class TCPStream : public Driver
 {
@@ -144,7 +132,7 @@
     // Must implement the following methods.
     virtual int Setup();
     virtual int Shutdown();
-    
+
     // This method will be invoked on each incoming message
     virtual int ProcessMessage(QueuePointer &resp_queue,
                                player_msghdr * hdr,
@@ -165,25 +153,19 @@
     // Close the terminal
     // Returns 0 on success
     virtual int CloseTerm();
-    
+
   protected:
          int sock;
-    
+
     uint8_t * rx_buffer;
-    //unsigned int rx_buffer_size;
-    
-    struct termios oldtio;
-    
-    // opaque device file descriptor
-    //int opaque_fd; 
-    
+
     // Properties
     IntProperty buffer_size;
     StringProperty ip;
     IntProperty port;
-    
+
     bool connected;
-    
+
     // This is the data we store and send
     player_opaque_data_t mData;
 
@@ -222,12 +204,12 @@
        this->RegisterProperty ("buffer_size", &this->buffer_size, cf, section);
        this->RegisterProperty ("ip", &this->ip, cf, section);
        this->RegisterProperty ("port", &this->port, cf, section);
-       
+
        rx_buffer = new uint8_t[buffer_size];
        assert(rx_buffer);
-       
+
        connected = false;
-       
+
        return;
 }
 
@@ -262,7 +244,7 @@
 {
   // Stop and join the driver thread
   StopThread();
-  
+
   CloseTerm();
 
   PLAYER_MSG0(2, "TCP Opaque Driver Shutdown");
@@ -273,21 +255,21 @@
 int TCPStream::ProcessMessage(QueuePointer & resp_queue,
                                  player_msghdr* hdr,
                                  void* data)
-{  
+{
        // Process messages here.  Send a response if necessary, using 
Publish().
        // If you handle the message successfully, return 0.  Otherwise,
        // return -1, and a NACK will be sent for you, if a response is 
required.
-       
+
        if (!connected)
        {
                PLAYER_MSG0(2, "TCP reconnecting");
                OpenTerm();
        }
-       
+
        if (Message::MatchMessage (hdr, PLAYER_MSGTYPE_CMD, 
PLAYER_OPAQUE_CMD_DATA, this->device_addr))
        {
            player_opaque_data_t * recv = reinterpret_cast<player_opaque_data_t 
* > (data);
-           
+
            if (recv->data_count) // If there is something to send.
            {
                int result;
@@ -302,7 +284,7 @@
                        CloseTerm();
                }
            }
-           
+
            return (0);
        }
 
@@ -318,8 +300,7 @@
   // The main loop; interact with the device here
   for(;;)
   {
-    // test if we are supposed to cancel
-    pthread_testcancel();
+    Wait(1);
 
     // Process incoming messages.  TCPStream::ProcessMessage() is
     // called on each message.
@@ -327,17 +308,14 @@
 
     if (connected)
     {
-       // Reads the data from the tcp server and then publishes it
-       ReadData();
+      // Reads the data from the tcp server and then publishes it
+      ReadData();
     }
     else
     {
-       PLAYER_MSG0(2, "TCP reconnecting");
-       OpenTerm();
+      PLAYER_MSG0(2, "TCP reconnecting");
+      OpenTerm();
     }
-
-    // Sleep (you might, for example, block on a read() instead)
-    usleep(1000);
   }
 }
 
@@ -349,7 +327,7 @@
        sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
        if (sock < 0)
                PLAYER_ERROR("Failed to create socket.");
-       
+
        sockaddr_in address;
        memset(&address, 0, sizeof(address));
        address.sin_family = AF_INET;
@@ -357,15 +335,16 @@
        address.sin_port = htons(port.GetValue());
        if (connect(sock, (sockaddr*) &address, sizeof(address)) < 0)
                PLAYER_ERROR("Failed to connect");
-       
+
        int flags = fcntl(sock, F_GETFL);
        if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) < 0)
                PLAYER_ERROR("Error changing socket to be non-blocking");
-       
+
        PLAYER_MSG0(2, "TCP Opaque Driver connected");
-  
+
        connected = true;
-       
+       AddFileWatch(sock);
+
   return 0;
 }
 
@@ -375,7 +354,9 @@
 //
 int TCPStream::CloseTerm()
 {
+  RemoveFileWatch(sock);
   close(sock);
+
   connected = false;
   return 0;
 }

Modified: code/player/branches/release-2-1-patches/server/server.cc
===================================================================
--- code/player/branches/release-2-1-patches/server/server.cc   2008-06-10 
04:38:58 UTC (rev 6500)
+++ code/player/branches/release-2-1-patches/server/server.cc   2008-06-10 
05:16:08 UTC (rev 6501)
@@ -2,8 +2,8 @@
  *  Player - One Hell of a Robot Server
  *  Copyright (C) 2005 -
  *     Brian Gerkey
- *                      
  *
+ *
  *  This program is free software; you can redistribute it and/or modify
  *  it under the terms of the GNU General Public License as published by
  *  the Free Software Foundation; either version 2 of the License, or
@@ -110,7 +110,7 @@
 void PrintVersion();
 void PrintCopyrightMsg();
 void PrintUsage();
-int ParseArgs(int* port, int* debuglevel, 
+int ParseArgs(int* port, int* debuglevel,
               char** cfgfilename, int* gz_serverid,
               int argc, char** argv);
 void Quit(int signum);
@@ -146,7 +146,7 @@
     PLAYER_ERROR1("signal() failed: %s", strerror(errno));
     exit(-1);
   }
-  
+
   player_globals_init();
   player_register_drivers();
   playerxdr_ftable_init();
@@ -294,6 +294,9 @@
 
   while(!player_quit)
   {
+    // wait until something other than driver requested watches happens
+    fileWatcher->Wait(0.1);
+
     if(ptcp->Accept(0) < 0)
     {
       PLAYER_ERROR("failed while accepting new TCP connections");
@@ -364,7 +367,7 @@
   fprintf(stderr,"* Player comes with ABSOLUTELY NO WARRANTY.  This is free 
software, and you\n* are welcome to redistribute it under certain conditions; 
see COPYING\n* for details.\n\n");
 }
 
-void 
+void
 PrintUsage()
 {
   int maxlen=66;
@@ -395,13 +398,13 @@
 }
 
 
-int 
+int
 ParseArgs(int* port, int* debuglevel, char** cfgfilename, int* gz_serverid,
           int argc, char** argv)
 {
   int ch;
   const char* optflags = "d:p:hq";
-  
+
   // Get letter options
   while((ch = getopt(argc, argv, optflags)) != -1)
   {
@@ -418,7 +421,7 @@
         break;
       case '?':
       case ':':
-      case 'h':        
+      case 'h':
       default:
         return(-1);
     }
@@ -426,7 +429,7 @@
 
   if(optind >= argc)
     return(-1);
-  
+
   *cfgfilename = argv[optind];
 
   return(0);


This was sent by the SourceForge.net collaborative development platform, the 
world's largest Open Source development site.

-------------------------------------------------------------------------
Check out the new SourceForge.net Marketplace.
It's the best place to buy or sell services for
just about anything Open Source.
http://sourceforge.net/services/buy/index.php
_______________________________________________
Playerstage-commit mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/playerstage-commit

Reply via email to