Revision: 6501
http://playerstage.svn.sourceforge.net/playerstage/?rev=6501&view=rev
Author: thjc
Date: 2008-06-09 22:16:08 -0700 (Mon, 09 Jun 2008)
Log Message:
-----------
Added a file watcher class which allows drivers to delegate watching a file to
the server. Removes the need for non blocking reads followed by a sleep (the
non blocking read is still a good idea, but can now user Wait() instead of the
sleep
Modified Paths:
--------------
code/player/branches/release-2-1-patches/libplayercore/Makefile.am
code/player/branches/release-2-1-patches/libplayercore/driver.cc
code/player/branches/release-2-1-patches/libplayercore/driver.h
code/player/branches/release-2-1-patches/libplayercore/globals.cc
code/player/branches/release-2-1-patches/libplayercore/globals.h
code/player/branches/release-2-1-patches/libplayercore/playercore.h
code/player/branches/release-2-1-patches/libplayertcp/playertcp.cc
code/player/branches/release-2-1-patches/libplayertcp/playerudp.cc
code/player/branches/release-2-1-patches/server/drivers/opaque/tcpstream.cc
code/player/branches/release-2-1-patches/server/server.cc
Added Paths:
-----------
code/player/branches/release-2-1-patches/libplayercore/filewatcher.cc
code/player/branches/release-2-1-patches/libplayercore/filewatcher.h
Modified: code/player/branches/release-2-1-patches/libplayercore/Makefile.am
===================================================================
--- code/player/branches/release-2-1-patches/libplayercore/Makefile.am
2008-06-10 04:38:58 UTC (rev 6500)
+++ code/player/branches/release-2-1-patches/libplayercore/Makefile.am
2008-06-10 05:16:08 UTC (rev 6501)
@@ -21,6 +21,7 @@
configfile.cc configfile.h \
playercommon.h player.h playerconfig.h.in \
message.h message.cc \
+ filewatcher.cc filewatcher.h \
wallclocktime.cc wallclocktime.h playertime.h \
plugins.cc plugins.h \
globals.cc globals.h \
@@ -43,6 +44,7 @@
devicetable.h \
driver.h \
drivertable.h \
+ filewatcher.h \
globals.h \
interface_util.h \
message.h \
Modified: code/player/branches/release-2-1-patches/libplayercore/driver.cc
===================================================================
--- code/player/branches/release-2-1-patches/libplayercore/driver.cc
2008-06-10 04:38:58 UTC (rev 6500)
+++ code/player/branches/release-2-1-patches/libplayercore/driver.cc
2008-06-10 05:16:08 UTC (rev 6501)
@@ -63,6 +63,7 @@
#include <libplayercore/devicetable.h>
#include <libplayercore/configfile.h>
#include <libplayercore/globals.h>
+#include <libplayercore/filewatcher.h>
#include <libplayercore/property.h>
#include <libplayercore/interface_util.h>
@@ -228,10 +229,10 @@
}
void
-Driver::Publish(player_devaddr_t addr,
- uint8_t type,
+Driver::Publish(player_devaddr_t addr,
+ uint8_t type,
uint8_t subtype,
- void* src,
+ void* src,
size_t deprecated,
double* timestamp,
bool copy)
@@ -304,6 +305,19 @@
return( shutdownResult );
}
+/** @brief Wake up the driver if the specified event occurs on the file
descriptor */
+int Driver::AddFileWatch(int fd, bool ReadWatch , bool WriteWatch , bool
ExceptWatch )
+{
+ return
fileWatcher->AddFileWatch(fd,InQueue,ReadWatch,WriteWatch,ExceptWatch);
+}
+
+/** @brief Remove a previously added watch, call with the same arguments as
when adding the watch */
+int Driver::RemoveFileWatch(int fd, bool ReadWatch , bool WriteWatch , bool
ExceptWatch )
+{
+ return
fileWatcher->RemoveFileWatch(fd,InQueue,ReadWatch,WriteWatch,ExceptWatch);
+}
+
+
/* start a thread that will invoke Main() */
void
Driver::StartThread(void)
Modified: code/player/branches/release-2-1-patches/libplayercore/driver.h
===================================================================
--- code/player/branches/release-2-1-patches/libplayercore/driver.h
2008-06-10 04:38:58 UTC (rev 6500)
+++ code/player/branches/release-2-1-patches/libplayercore/driver.h
2008-06-10 05:16:08 UTC (rev 6501)
@@ -146,7 +146,7 @@
@returns 0 on success, non-zero otherwise. */
int AddInterface(player_devaddr_t *addr, ConfigFile * cf, int section, int
code, char * key = NULL);
-
+
/** @brief Set/reset error code */
void SetError(int code) {this->error = code;}
@@ -154,13 +154,20 @@
Call this method to block until a new message arrives on
Driver::InQueue. This method will return true immediately if at least
- one message is already waiting.
-
+ one message is already waiting.
+
If TimeOut is set to a positive value this method will return false if the
timeout occurs before and update is recieved.
*/
bool Wait(double TimeOut=0.0) { return this->InQueue->Wait(); }
+ /** @brief Wake up the driver if the specified event occurs on the file
descriptor */
+ int AddFileWatch(int fd, bool ReadWatch = true, bool WriteWatch = false,
bool ExceptWatch = true);
+
+ /** @brief Remove a previously added watch, call with the same arguments
as when adding the watch */
+ int RemoveFileWatch(int fd, bool ReadWatch = true, bool WriteWatch =
false, bool ExceptWatch = true);
+
+
public:
/** @brief The driver's thread.
@@ -203,10 +210,10 @@
bool copy=true);
/** @brief Publish a message via one of this driver's interfaces.
-
+
This form of Publish will assemble the message header for you.
The message is broadcast to all interested parties
-
+
@param addr The origin address
@param type The message type
@param subtype The message subtype
@@ -215,23 +222,23 @@
@param timestamp Timestamp for the message body (if NULL, then the
current time will be filled in)
@param copy if set to false the data will be claimed and the caller
should no longer use or free it */
- virtual void Publish(player_devaddr_t addr,
- uint8_t type,
+ virtual void Publish(player_devaddr_t addr,
+ uint8_t type,
uint8_t subtype,
- void* src=NULL,
+ void* src=NULL,
size_t deprecated=0,
double* timestamp=NULL,
bool copy=true);
-
-
+
+
/** @brief Publish a message via one of this driver's interfaces.
Use this form of Publish if you already have the message header
assembled and have a target queue to send to.
@param queue the target queue.
@param hdr The message header
- @param src The message body
+ @param src The message body
@param copy if set to false the data will be claimed and the caller should
no longer use or free it */
virtual void Publish(QueuePointer &queue,
player_msghdr_t* hdr,
@@ -243,8 +250,8 @@
Use this form of Publish if you already have the message header
assembled and wish to broadcast the message to all subscribed parties.
@param hdr The message header
- @param src The message body
- @param copy if set to false the data will be claimed and the caller should
no longer use or free it */
+ @param src The message body
+ @param copy if set to false the data will be claimed and the caller should
no longer use or free it */
virtual void Publish(player_msghdr_t* hdr,
void* src,
bool copy = true);
@@ -321,15 +328,15 @@
subscriptions to the driver; a driver MAY override them, but
usually won't. This alternative form includes the clients queue
so you can map future requests and unsubscriptions to a specific queue.
-
+
If this methods returns a value other than 1 then the other form of
subscribe wont be called
@param queue The queue of the subscribing client
@param addr Address of the device to subscribe to (the driver may
have more than one interface).
@returns Returns 0 on success, -ve on error and 1 for unimplemented. */
- virtual int Subscribe(QueuePointer &queue, player_devaddr_t addr) {return
1;};
-
+ virtual int Subscribe(QueuePointer &queue, player_devaddr_t addr) {return
1;};
+
/** @brief Unsubscribe from this driver.
The Subscribe() and Unsubscribe() methods are used to control
@@ -347,7 +354,7 @@
subscriptions to the driver; a driver MAY override them, but
usually won't.This alternative form includes the clients queue
so you can map future requests and unsubscriptions to a specific queue.
-
+
If this methods returns a value other than 1 then the other form of
subscribe wont be called
@param queue The queue of the subscribing client
@@ -355,7 +362,7 @@
have more than one interface).
@returns Returns 0 on success. */
virtual int Unsubscribe(QueuePointer &queue, player_devaddr_t addr)
{return 1;};
-
+
/** @brief Initialize the driver.
This function is called with the first client subscribes; it MUST
@@ -447,7 +454,6 @@
@param section Configuration file section that may define the property
value
@return True if the property was registered, false otherwise */
virtual bool RegisterProperty(Property *prop, ConfigFile* cf, int section);
-
};
Added: code/player/branches/release-2-1-patches/libplayercore/filewatcher.cc
===================================================================
--- code/player/branches/release-2-1-patches/libplayercore/filewatcher.cc
(rev 0)
+++ code/player/branches/release-2-1-patches/libplayercore/filewatcher.cc
2008-06-10 05:16:08 UTC (rev 6501)
@@ -0,0 +1,188 @@
+/*
+ * FileWatcher.cc
+ *
+ * Created on: 10/06/2008
+ * Author: tcollett
+ */
+
+#include <libplayercore/filewatcher.h>
+#include <libplayercore/message.h>
+#include <libplayercore/error.h>
+#include <sys/time.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <error.h>
+#include <math.h>
+#include <string.h>
+
+FileWatcher::FileWatcher()
+{
+ WatchedFilesArraySize = INITIAL_WATCHED_FILES_ARRAY_SIZE;
+ WatchedFilesArrayCount = 0;
+ WatchedFiles = reinterpret_cast<struct fd_driver_pair *>
(calloc(WatchedFilesArraySize,sizeof(WatchedFiles[0])));
+ assert(WatchedFiles);
+ pthread_mutex_init(&this->lock,NULL);
+
+}
+
+FileWatcher::~FileWatcher()
+{
+ free(WatchedFiles);
+}
+
+void FileWatcher::Lock()
+{
+ pthread_mutex_lock(&lock);
+}
+
+void FileWatcher::Unlock()
+{
+ pthread_mutex_unlock(&lock);
+}
+
+
+int FileWatcher::Wait(double Timeout)
+{
+ Lock();
+ if (WatchedFilesArrayCount == 0)
+ {
+ Unlock();
+ return 0;
+ }
+
+ // intialise our FD sets for the select call
+ fd_set ReadFds,WriteFds,ExceptFds;
+ FD_ZERO(&ReadFds);
+ FD_ZERO(&WriteFds);
+ FD_ZERO(&ExceptFds);
+
+ int maxfd = 0;
+
+ for (unsigned int ii = 0; ii < WatchedFilesArrayCount; ++ii)
+ {
+ if (WatchedFiles[ii].fd >= 0)
+ {
+ if (WatchedFiles[ii].fd > maxfd)
+ maxfd = WatchedFiles[ii].fd;
+ if (WatchedFiles[ii].Read)
+ FD_SET(WatchedFiles[ii].fd,&ReadFds);
+ if (WatchedFiles[ii].Write)
+ FD_SET(WatchedFiles[ii].fd,&WriteFds);
+ if (WatchedFiles[ii].Except)
+ FD_SET(WatchedFiles[ii].fd,&ExceptFds);
+ }
+ }
+
+ struct timeval t;
+ t.tv_sec = static_cast<int> (floor(Timeout));
+ t.tv_usec = static_cast<int> ((Timeout - static_cast<int>
(floor(Timeout))) * 1e6);
+ Unlock();
+ int ret = select (maxfd+1,&ReadFds,&WriteFds,&ExceptFds,&t);
+
+ if (ret < 0)
+ {
+ PLAYER_ERROR2("Select called failed in File Watcher: %d
%s",errno,strerror(errno));
+ return ret;
+ }
+
+ int queueless_count = 0;
+
+ Lock();
+ for (unsigned int ii = 0; ii < WatchedFilesArrayCount &&
static_cast<int> (ii) < maxfd; ++ii)
+ {
+ int fd = WatchedFiles[ii].fd;
+ QueuePointer &q = WatchedFiles[ii].queue;
+ if (fd > 0 && fd < maxfd)
+ {
+ if ((WatchedFiles[ii].Read && FD_ISSET(fd,&ReadFds)) ||
+ (WatchedFiles[ii].Write &&
FD_ISSET(fd,&WriteFds)) ||
+ (WatchedFiles[ii].Except &&
FD_ISSET(fd,&ExceptFds)))
+ {
+ if (q != NULL)
+ q->DataAvailable();
+ else
+ queueless_count++;
+
+ }
+ }
+ }
+ Unlock();
+
+ return queueless_count;
+}
+
+int FileWatcher::AddFileWatch(int fd, bool WatchRead, bool WatchWrite, bool
WatchExcept)
+{
+ QueuePointer q;
+ return AddFileWatch(fd, q,WatchRead,WatchWrite,WatchExcept);
+}
+
+
+int FileWatcher::AddFileWatch(int fd, QueuePointer & queue, bool WatchRead,
bool WatchWrite, bool WatchExcept)
+{
+ Lock();
+ // find the first available file descriptor
+ struct fd_driver_pair *next_entry = NULL;
+ if (WatchedFilesArrayCount < WatchedFilesArraySize)
+ {
+ next_entry = &WatchedFiles[WatchedFilesArrayCount];
+ WatchedFilesArrayCount++;
+ }
+ else
+ {
+ // first see if there is an empty spot in the list
+ for (unsigned int ii = 0; ii < WatchedFilesArrayCount; ++ii)
+ {
+ if (WatchedFiles[ii].fd < 0)
+ {
+ next_entry = &WatchedFiles[ii];
+ break;
+ }
+ }
+ if (next_entry == NULL)
+ {
+ // otherwise we allocate some more room for the array
+ WatchedFilesArraySize*=2;
+ WatchedFiles = reinterpret_cast<struct fd_driver_pair
*> (realloc(WatchedFiles,sizeof(WatchedFiles[0])*WatchedFilesArraySize));
+ next_entry = &WatchedFiles[WatchedFilesArrayCount];
+
+ }
+ }
+
+ next_entry->fd = fd;
+ next_entry->queue = queue;
+ next_entry->Read = WatchRead;
+ next_entry->Write = WatchWrite;
+ next_entry->Except = WatchExcept;
+ Unlock();
+ return 0;
+}
+
+int FileWatcher::RemoveFileWatch(int fd, bool WatchRead, bool WatchWrite, bool
WatchExcept)
+{
+ QueuePointer q;
+ return RemoveFileWatch(fd, q,WatchRead,WatchWrite,WatchExcept);
+}
+
+
+int FileWatcher::RemoveFileWatch(int fd, QueuePointer &queue, bool WatchRead,
bool WatchWrite, bool WatchExcept)
+{
+ Lock();
+ // this finds the first matching entry and removes it. It only removes
one entry so call remove for every add
+ for (unsigned int ii = 0; ii < WatchedFilesArrayCount; ++ii)
+ {
+ if (WatchedFiles[ii].fd == fd &&
+ WatchedFiles[ii].queue == queue &&
+ WatchedFiles[ii].Read == WatchRead &&
+ WatchedFiles[ii].Write == WatchWrite &&
+ WatchedFiles[ii].Except == WatchExcept)
+ {
+ WatchedFiles[ii].fd = -1;
+ Unlock();
+ return 0;
+ }
+ }
+ Unlock();
+ return -1;
+}
+
Property changes on:
code/player/branches/release-2-1-patches/libplayercore/filewatcher.cc
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Added: code/player/branches/release-2-1-patches/libplayercore/filewatcher.h
===================================================================
--- code/player/branches/release-2-1-patches/libplayercore/filewatcher.h
(rev 0)
+++ code/player/branches/release-2-1-patches/libplayercore/filewatcher.h
2008-06-10 05:16:08 UTC (rev 6501)
@@ -0,0 +1,56 @@
+/*
+ * filewatcher.h
+ *
+ * Created on: 10/06/2008
+ * Author: tcollett
+ */
+
+#ifndef FILEWATCHER_H_
+#define FILEWATCHER_H_
+
+#include <sys/select.h>
+#include <sys/types.h>
+#include <stdlib.h>
+#include <pthread.h>
+#include <libplayercore/message.h>
+
+class MessageQueue;
+
+struct fd_driver_pair
+{
+ int fd;
+ QueuePointer queue;
+ bool Read;
+ bool Write;
+ bool Except;
+};
+
+const size_t INITIAL_WATCHED_FILES_ARRAY_SIZE = 32;
+
+class FileWatcher
+{
+public:
+ FileWatcher();
+ virtual ~FileWatcher();
+
+ int Wait(double Timeout = 0);
+ int AddFileWatch(int fd, QueuePointer & queue, bool WatchRead = true,
bool WatchWrite = false, bool WatchExcept = true);
+ int RemoveFileWatch(int fd, QueuePointer & queue, bool WatchRead =
true, bool WatchWrite = false, bool WatchExcept = true);
+ int AddFileWatch(int fd, bool WatchRead = true, bool WatchWrite =
false, bool WatchExcept = true);
+ int RemoveFileWatch(int fd, bool WatchRead = true, bool WatchWrite =
false, bool WatchExcept = true);
+
+private:
+ struct fd_driver_pair * WatchedFiles;
+ size_t WatchedFilesArraySize;
+ size_t WatchedFilesArrayCount;
+
+ /** @brief Lock access to watcher internals. */
+ virtual void Lock(void);
+ /** @brief Unlock access to watcher internals. */
+ virtual void Unlock(void);
+ /// Used to lock access to Data.
+ pthread_mutex_t lock;
+
+};
+
+#endif /* FILEWATCHER_H_ */
Property changes on:
code/player/branches/release-2-1-patches/libplayercore/filewatcher.h
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified: code/player/branches/release-2-1-patches/libplayercore/globals.cc
===================================================================
--- code/player/branches/release-2-1-patches/libplayercore/globals.cc
2008-06-10 04:38:58 UTC (rev 6500)
+++ code/player/branches/release-2-1-patches/libplayercore/globals.cc
2008-06-10 05:16:08 UTC (rev 6501)
@@ -49,6 +49,7 @@
#include <libplayercore/devicetable.h>
#include <libplayercore/drivertable.h>
+#include <libplayercore/filewatcher.h>
#include <libplayercore/playertime.h>
#include <libplayercore/wallclocktime.h>
@@ -62,11 +63,14 @@
// this table holds all the currently *available* drivers
DriverTable* driverTable;
-// the global PlayerTime object has a method
+// the global PlayerTime object has a method
// int GetTime(struct timeval*)
// which everyone must use to get the current time
PlayerTime* GlobalTime;
+// global class for watching for changes in files and sockets
+FileWatcher* fileWatcher;
+
char playerversion[32];
bool player_quit;
@@ -86,6 +90,7 @@
deviceTable = new DeviceTable();
driverTable = new DriverTable();
GlobalTime = new WallclockTime();
+ fileWatcher = new FileWatcher();
strncpy(playerversion, VERSION, sizeof(playerversion));
player_quit = false;
player_quiet_startup = false;
@@ -97,12 +102,10 @@
void
player_globals_fini()
{
- if(deviceTable)
- delete deviceTable;
- if(driverTable)
- delete driverTable;
- if(GlobalTime)
- delete GlobalTime;
+ delete deviceTable;
+ delete driverTable;
+ delete GlobalTime;
+ delete fileWatcher;
#if HAVE_PLAYERSD
if(globalSD)
player_sd_fini(globalSD);
Modified: code/player/branches/release-2-1-patches/libplayercore/globals.h
===================================================================
--- code/player/branches/release-2-1-patches/libplayercore/globals.h
2008-06-10 04:38:58 UTC (rev 6500)
+++ code/player/branches/release-2-1-patches/libplayercore/globals.h
2008-06-10 05:16:08 UTC (rev 6501)
@@ -42,11 +42,13 @@
class DeviceTable;
class PlayerTime;
class DriverTable;
+class FileWatcher;
struct player_sd;
extern DeviceTable* deviceTable;
extern PlayerTime* GlobalTime;
extern DriverTable* driverTable;
+extern FileWatcher* fileWatcher;
extern char playerversion[];
extern bool player_quit;
extern bool player_quiet_startup;
Modified: code/player/branches/release-2-1-patches/libplayercore/playercore.h
===================================================================
--- code/player/branches/release-2-1-patches/libplayercore/playercore.h
2008-06-10 04:38:58 UTC (rev 6500)
+++ code/player/branches/release-2-1-patches/libplayercore/playercore.h
2008-06-10 05:16:08 UTC (rev 6501)
@@ -46,6 +46,7 @@
#include <libplayercore/driver.h>
#include <libplayercore/drivertable.h>
#include <libplayercore/error.h>
+#include <libplayercore/filewatcher.h>
#include <libplayercore/globals.h>
#include <libplayercore/interface_util.h>
#include <libplayercore/message.h>
Modified: code/player/branches/release-2-1-patches/libplayertcp/playertcp.cc
===================================================================
--- code/player/branches/release-2-1-patches/libplayertcp/playertcp.cc
2008-06-10 04:38:58 UTC (rev 6500)
+++ code/player/branches/release-2-1-patches/libplayertcp/playertcp.cc
2008-06-10 05:16:08 UTC (rev 6501)
@@ -189,6 +189,9 @@
// set up for later use of poll() to accept() connections on this port
this->listen_ufds[i].fd = this->listeners[i].fd;
this->listen_ufds[i].events = POLLIN;
+
+ // set up for later use by global file watcher
+ fileWatcher->AddFileWatch(this->listeners[i].fd);
}
return(0);
@@ -248,6 +251,10 @@
this->client_ufds[j].fd = this->clients[j].fd;
this->client_ufds[j].events = POLLIN;
+ // set up for later use by global file watcher
+ fileWatcher->AddFileWatch(this->client_ufds[j].fd);
+
+
// Create an outgoing queue for this client
this->clients[j].queue =
QueuePointer(true,PLAYER_MSGQUEUE_DEFAULT_MAXLEN);
@@ -390,6 +397,8 @@
free(this->clients[cli].dev_subs);
if(close(this->clients[cli].fd) < 0)
PLAYER_WARN1("close() failed: %s", strerror(errno));
+ fileWatcher->RemoveFileWatch(this->clients[cli].fd);
+
this->clients[cli].fd = -1;
this->clients[cli].valid = 0;
this->clients[cli].queue = QueuePointer();
@@ -746,8 +755,8 @@
int ret = pthread_mutex_trylock(&clients_mutex);
assert (ret == EBUSY);
}
-
-
+
+
if(!have_lock)
Lock();
@@ -1328,13 +1337,13 @@
}
-void
+void
PlayerTCP::Lock()
{
pthread_mutex_lock(&clients_mutex);
}
-void
+void
PlayerTCP::Unlock()
{
pthread_mutex_unlock(&clients_mutex);
Modified: code/player/branches/release-2-1-patches/libplayertcp/playerudp.cc
===================================================================
--- code/player/branches/release-2-1-patches/libplayertcp/playerudp.cc
2008-06-10 04:38:58 UTC (rev 6500)
+++ code/player/branches/release-2-1-patches/libplayertcp/playerudp.cc
2008-06-10 05:16:08 UTC (rev 6501)
@@ -176,6 +176,10 @@
// set up for later use of poll() to accept() connections on this port
this->listen_ufds[i].fd = this->listeners[i].fd;
this->listen_ufds[i].events = POLLIN;
+
+ // set up for later use by global file watcher
+ fileWatcher->AddFileWatch(this->listeners[i].fd);
+
}
return(0);
@@ -271,6 +275,7 @@
}
}
free(this->clients[cli].dev_subs);
+ fileWatcher->RemoveFileWatch(this->clients[cli].fd);
this->clients[cli].fd = -1;
this->clients[cli].valid = 0;
// FIXME
@@ -809,7 +814,7 @@
// update the message size and send it off
hdr.size = decode_msglen;
void * msg_data = hdr.size? this->decode_readbuffer: NULL;
-
+
if(hdr.addr.interf == PLAYER_PLAYER_CODE)
{
Message* msg = new Message(hdr, msg_data, client->queue);
Modified:
code/player/branches/release-2-1-patches/server/drivers/opaque/tcpstream.cc
===================================================================
--- code/player/branches/release-2-1-patches/server/drivers/opaque/tcpstream.cc
2008-06-10 04:38:58 UTC (rev 6500)
+++ code/player/branches/release-2-1-patches/server/drivers/opaque/tcpstream.cc
2008-06-10 05:16:08 UTC (rev 6501)
@@ -21,7 +21,7 @@
/** @ingroup drivers Drivers */
/** @{ */
/*
- *
+ *
The tcpstream driver is based on the serialstream driver. It reads from a
socket
continuously and publishes the data. Currently this is usable with the
SickS3000 driver
and the Nav200 driver. This driver does no interpretation of data output,
merely reading
@@ -44,7 +44,7 @@
- PLAYER_LASER_REQ_GET_GEOM
- PLAYER_LASER_REQ_GET_CONFIG
-
+
@par Configuration file options
- ip (string)
@@ -58,9 +58,9 @@
- buffer_size (integer)
- The size of the buffer to be used when reading, this is the maximum that
can be read in one read command
- Default 4096
-
[EMAIL PROTECTED] Example
[EMAIL PROTECTED] Example
+
@verbatim
driver
(
@@ -79,12 +79,12 @@
@endverbatim
[EMAIL PROTECTED] David Olsen, Toby Collett, Inro Technologies
[EMAIL PROTECTED] David Olsen, Toby Collett, Inro Technologies
*/
/** @} */
-
+
// ONLY if you need something that was #define'd as a result of configure
// (e.g., HAVE_CFMAKERAW), then #include <config.h>, like so:
/*
@@ -116,22 +116,10 @@
#include <libplayercore/playercore.h>
#define DEFAULT_TCP_OPAQUE_BUFFER_SIZE 4096
-#define DEFAULT_TCP_OPAQUE_IP "10.99.10.6"
+#define DEFAULT_TCP_OPAQUE_IP "127.0.0.1"
#define DEFAULT_TCP_OPAQUE_PORT 4002
////////////////////////////////////////////////////////////////////////////////
-// Device codes
-
-#define STX 0x02
-#define ACK 0xA0
-#define NACK 0x92
-#define CRC16_GEN_POL 0x8005
-
-////////////////////////////////////////////////////////////////////////////////
-// Error macros
-#define RETURN_ERROR(erc, m) {PLAYER_ERROR(m); return erc;}
-
-////////////////////////////////////////////////////////////////////////////////
// The class for the driver
class TCPStream : public Driver
{
@@ -144,7 +132,7 @@
// Must implement the following methods.
virtual int Setup();
virtual int Shutdown();
-
+
// This method will be invoked on each incoming message
virtual int ProcessMessage(QueuePointer &resp_queue,
player_msghdr * hdr,
@@ -165,25 +153,19 @@
// Close the terminal
// Returns 0 on success
virtual int CloseTerm();
-
+
protected:
int sock;
-
+
uint8_t * rx_buffer;
- //unsigned int rx_buffer_size;
-
- struct termios oldtio;
-
- // opaque device file descriptor
- //int opaque_fd;
-
+
// Properties
IntProperty buffer_size;
StringProperty ip;
IntProperty port;
-
+
bool connected;
-
+
// This is the data we store and send
player_opaque_data_t mData;
@@ -222,12 +204,12 @@
this->RegisterProperty ("buffer_size", &this->buffer_size, cf, section);
this->RegisterProperty ("ip", &this->ip, cf, section);
this->RegisterProperty ("port", &this->port, cf, section);
-
+
rx_buffer = new uint8_t[buffer_size];
assert(rx_buffer);
-
+
connected = false;
-
+
return;
}
@@ -262,7 +244,7 @@
{
// Stop and join the driver thread
StopThread();
-
+
CloseTerm();
PLAYER_MSG0(2, "TCP Opaque Driver Shutdown");
@@ -273,21 +255,21 @@
int TCPStream::ProcessMessage(QueuePointer & resp_queue,
player_msghdr* hdr,
void* data)
-{
+{
// Process messages here. Send a response if necessary, using
Publish().
// If you handle the message successfully, return 0. Otherwise,
// return -1, and a NACK will be sent for you, if a response is
required.
-
+
if (!connected)
{
PLAYER_MSG0(2, "TCP reconnecting");
OpenTerm();
}
-
+
if (Message::MatchMessage (hdr, PLAYER_MSGTYPE_CMD,
PLAYER_OPAQUE_CMD_DATA, this->device_addr))
{
player_opaque_data_t * recv = reinterpret_cast<player_opaque_data_t
* > (data);
-
+
if (recv->data_count) // If there is something to send.
{
int result;
@@ -302,7 +284,7 @@
CloseTerm();
}
}
-
+
return (0);
}
@@ -318,8 +300,7 @@
// The main loop; interact with the device here
for(;;)
{
- // test if we are supposed to cancel
- pthread_testcancel();
+ Wait(1);
// Process incoming messages. TCPStream::ProcessMessage() is
// called on each message.
@@ -327,17 +308,14 @@
if (connected)
{
- // Reads the data from the tcp server and then publishes it
- ReadData();
+ // Reads the data from the tcp server and then publishes it
+ ReadData();
}
else
{
- PLAYER_MSG0(2, "TCP reconnecting");
- OpenTerm();
+ PLAYER_MSG0(2, "TCP reconnecting");
+ OpenTerm();
}
-
- // Sleep (you might, for example, block on a read() instead)
- usleep(1000);
}
}
@@ -349,7 +327,7 @@
sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if (sock < 0)
PLAYER_ERROR("Failed to create socket.");
-
+
sockaddr_in address;
memset(&address, 0, sizeof(address));
address.sin_family = AF_INET;
@@ -357,15 +335,16 @@
address.sin_port = htons(port.GetValue());
if (connect(sock, (sockaddr*) &address, sizeof(address)) < 0)
PLAYER_ERROR("Failed to connect");
-
+
int flags = fcntl(sock, F_GETFL);
if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) < 0)
PLAYER_ERROR("Error changing socket to be non-blocking");
-
+
PLAYER_MSG0(2, "TCP Opaque Driver connected");
-
+
connected = true;
-
+ AddFileWatch(sock);
+
return 0;
}
@@ -375,7 +354,9 @@
//
int TCPStream::CloseTerm()
{
+ RemoveFileWatch(sock);
close(sock);
+
connected = false;
return 0;
}
Modified: code/player/branches/release-2-1-patches/server/server.cc
===================================================================
--- code/player/branches/release-2-1-patches/server/server.cc 2008-06-10
04:38:58 UTC (rev 6500)
+++ code/player/branches/release-2-1-patches/server/server.cc 2008-06-10
05:16:08 UTC (rev 6501)
@@ -2,8 +2,8 @@
* Player - One Hell of a Robot Server
* Copyright (C) 2005 -
* Brian Gerkey
- *
*
+ *
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
@@ -110,7 +110,7 @@
void PrintVersion();
void PrintCopyrightMsg();
void PrintUsage();
-int ParseArgs(int* port, int* debuglevel,
+int ParseArgs(int* port, int* debuglevel,
char** cfgfilename, int* gz_serverid,
int argc, char** argv);
void Quit(int signum);
@@ -146,7 +146,7 @@
PLAYER_ERROR1("signal() failed: %s", strerror(errno));
exit(-1);
}
-
+
player_globals_init();
player_register_drivers();
playerxdr_ftable_init();
@@ -294,6 +294,9 @@
while(!player_quit)
{
+ // wait until something other than driver requested watches happens
+ fileWatcher->Wait(0.1);
+
if(ptcp->Accept(0) < 0)
{
PLAYER_ERROR("failed while accepting new TCP connections");
@@ -364,7 +367,7 @@
fprintf(stderr,"* Player comes with ABSOLUTELY NO WARRANTY. This is free
software, and you\n* are welcome to redistribute it under certain conditions;
see COPYING\n* for details.\n\n");
}
-void
+void
PrintUsage()
{
int maxlen=66;
@@ -395,13 +398,13 @@
}
-int
+int
ParseArgs(int* port, int* debuglevel, char** cfgfilename, int* gz_serverid,
int argc, char** argv)
{
int ch;
const char* optflags = "d:p:hq";
-
+
// Get letter options
while((ch = getopt(argc, argv, optflags)) != -1)
{
@@ -418,7 +421,7 @@
break;
case '?':
case ':':
- case 'h':
+ case 'h':
default:
return(-1);
}
@@ -426,7 +429,7 @@
if(optind >= argc)
return(-1);
-
+
*cfgfilename = argv[optind];
return(0);
This was sent by the SourceForge.net collaborative development platform, the
world's largest Open Source development site.
-------------------------------------------------------------------------
Check out the new SourceForge.net Marketplace.
It's the best place to buy or sell services for
just about anything Open Source.
http://sourceforge.net/services/buy/index.php
_______________________________________________
Playerstage-commit mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/playerstage-commit