Author: mkolar
Date: 2007-06-01 16:56:01 +0000 (Fri, 01 Jun 2007)
New Revision: 13446
Added:
trunk/apps/CppFCPLib/
trunk/apps/CppFCPLib/DefaultValues.h
trunk/apps/CppFCPLib/Exceptions.cpp
trunk/apps/CppFCPLib/Exceptions.h
trunk/apps/CppFCPLib/JobTicket.cpp
trunk/apps/CppFCPLib/JobTicket.h
trunk/apps/CppFCPLib/Log.cpp
trunk/apps/CppFCPLib/Log.h
trunk/apps/CppFCPLib/Message.cpp
trunk/apps/CppFCPLib/Message.h
trunk/apps/CppFCPLib/Node.cpp
trunk/apps/CppFCPLib/Node.h
trunk/apps/CppFCPLib/NodeThread.cpp
trunk/apps/CppFCPLib/NodeThread.h
trunk/apps/CppFCPLib/Server.cpp
trunk/apps/CppFCPLib/Server.h
trunk/apps/CppFCPLib/TQueue.h
trunk/apps/CppFCPLib/main.cpp
Log:
Initial import of CppFCPLib to the repository
Added: trunk/apps/CppFCPLib/DefaultValues.h
===================================================================
--- trunk/apps/CppFCPLib/DefaultValues.h (rev 0)
+++ trunk/apps/CppFCPLib/DefaultValues.h 2007-06-01 16:56:01 UTC (rev
13446)
@@ -0,0 +1,19 @@
+
+#ifndef DEFAULTVALUES_H_
+#define DEFAULTVALUES_H_
+
+#include <string>
+//#include "Log.h"
+
+const std::string defaultFCPHost = "127.0.0.1";
+const int defaultFCPPort = 9481;
+const std::string defaultFProxyHost = "127.0.0.1";
+const int defaultFProxyPort = 8888;
+//const FCPLib::verbosityLevel defaultVerbosity = FCPLib::ERROR;
+const int inputBufferSize = 64 * 1024;
+const int outputBufferSize = 64 * 1024;
+const int maxSizeMessage = 8192;
+const int pollTimeout = 100;
+const int oneyear = 86400 * 365 * 1000;
+
+#endif
Added: trunk/apps/CppFCPLib/Exceptions.cpp
===================================================================
--- trunk/apps/CppFCPLib/Exceptions.cpp (rev 0)
+++ trunk/apps/CppFCPLib/Exceptions.cpp 2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,31 @@
+#include "Exceptions.h"
+
+using namespace FCPLib;
+
+StdError::StdError() :
+ std::runtime_error(strerror(errno)),
+ error(errno)
+{
+}
+
+StdError::StdError(int error_) :
+ std::runtime_error(strerror(error_)),
+ error(error_)
+{
+}
+
+StdError::StdError(std::string &func, std::string &message, std::string
&errstring) :
+ std::runtime_error(func + " : " + message + " " + errstring),
+ error(errno)
+{
+}
+
+StdError::StdError(const char *func, const char *message, const char
*errstring) :
+ std::runtime_error(std::string(func) + " : " + std::string(message) + " " +
std::string(errstring)),
+ error(errno)
+{
+}
+
+StdError::~StdError() throw()
+{
+}
Added: trunk/apps/CppFCPLib/Exceptions.h
===================================================================
--- trunk/apps/CppFCPLib/Exceptions.h (rev 0)
+++ trunk/apps/CppFCPLib/Exceptions.h 2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,22 @@
+
+#ifndef EXCEPTIONS_H__
+#define EXCEPTIONS_H__
+
+#include <stdexcept>
+#include <cerrno>
+
+namespace FCPLib {
+
+class StdError : public std::runtime_error {
+ int error;
+public:
+ StdError();
+ StdError(int error_);
+ StdError(std::string &func, std::string &message, std::string &errstring);
+ StdError(const char *func, const char *message, const char *errstring);
+ ~StdError() throw();
+};
+
+}
+
+#endif
Added: trunk/apps/CppFCPLib/JobTicket.cpp
===================================================================
--- trunk/apps/CppFCPLib/JobTicket.cpp (rev 0)
+++ trunk/apps/CppFCPLib/JobTicket.cpp 2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,125 @@
+
+#include "JobTicket.h"
+#include "DefaultValues.h"
+#include "zthread/Thread.h"
+#include <boost/lexical_cast.hpp>
+
+using namespace FCPLib;
+
+JobTicket::JobTicket(std::string id_, Message::MessagePtr &cmd_)
+ : id(id_),
+ cmd(cmd_),
+ async_(false),
+ keep(false),
+ waitTillSent_(false),
+ timeout_(oneyear),
+ isReprValid(false)
+// hasStream(false)
+{
+ lock.acquire();
+ reqSentLock.acquire();
+}
+
+JobTicket&
+JobTicket::async(bool async)
+{
+ isReprValid = false;
+ async_ = async; return *this;
+}
+
+inline JobTicket&
+JobTicket::keepJob(bool keep_)
+{
+ isReprValid = false;
+ keep = keep_; return *this;
+}
+inline JobTicket&
+JobTicket::waitTillSent(bool wait_)
+{
+ isReprValid = false;
+ waitTillSent_ = wait_; return *this;
+}
+inline JobTicket&
+JobTicket::timeout(int timeout)
+{
+ isReprValid = false;
+ timeout_ = timeout; return *this;
+}
+
+const std::string&
+JobTicket::getId() const
+{
+ return id;
+}
+
+const std::string&
+JobTicket::commandName() const
+{
+ return cmd->getHeader();
+}
+
+const std::string&
+JobTicket::getMessageText() const
+{
+ return cmd->toString();
+}
+
+void
+JobTicket::wait(unsigned int timeout)
+{
+ if (!timeout){
+ while (!lock.tryAcquire(100))
+ ZThread::Thread::sleep(100);
+ lock.release();
+ return;
+ }
+
+ unsigned int then = (unsigned int) time(0);
+ unsigned int elapsed;
+ while (!reqSentLock.tryAcquire(100)){
+ elapsed = (unsigned int) time(0) - then;
+ if (elapsed < timeout){
+ Thread::sleep(1000)
+ log().log(DEBUG, "wait:"+job->commandName()+":"+job->getId()+": job not
dispatched, timeout in " +
+ boost::lexical_cast<std::string>(timeout-elapsed));
+ continue;
+ }
+ }
+}
+
+void
+JobTicket::waitTillReqSent()
+{
+ reqSentLock.acquire();
+}
+
+void
+JobTicket::putResult()
+{
+ lock.release();
+}
+
+const std::vector<Message::MessagePtr>&
+JobTicket::getResponse() const
+{
+ return nodeResponse;
+}
+
+const std::string&
+JobTicket::toString()
+{
+ if (isReprValid)
+ return repr;
+
+ repr = "";
+ isReprValid = true;
+
+ repr += "Job id=" + id + "\n";
+ repr += getMessageText();
+ repr += "async=" + boost::lexical_cast<std::string>(async_) + "\n";
+ repr += "keepJob=" + boost::lexical_cast<std::string>(keep) + "\n";
+ repr += "waitTillSent=" + boost::lexical_cast<std::string>(waitTillSent_) +
"\n";
+ repr += "timeout=" + boost::lexical_cast<std::string>(timeout_) + "\n";
+
+ return repr;
+}
Added: trunk/apps/CppFCPLib/JobTicket.h
===================================================================
--- trunk/apps/CppFCPLib/JobTicket.h (rev 0)
+++ trunk/apps/CppFCPLib/JobTicket.h 2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,60 @@
+#ifndef JOBTICKET_H__
+#define JOBTICKET_H__
+
+#include "Log.h"
+#include "Message.h"
+#include <vector>
+#include <boost/shared_ptr.hpp>
+#include <iostream>
+#include "zthread/FastMutex.h"
+
+namespace FCPLib {
+
+class NodeThread;
+
+class JobTicket {
+ std::string id;
+ Message::MessagePtr cmd;
+ std::vector<Message::MessagePtr> nodeResponse;
+ bool async_;
+ bool keep;
+ bool waitTillSent_;
+ int timeout_;
+
+ std::string repr;
+ bool isReprValid;
+
+// bool hasStream;
+// ostream stream;
+
+ ZThread::FastMutex lock;
+ ZThread::FastMutex reqSentLock;
+ int timeQueued;
+
+ void putResult();
+public:
+ typedef boost::shared_ptr<JobTicket> JobTicketPtr;
+
+ JobTicket(std::string id_, Message::MessagePtr &cmd_);
+ inline JobTicket& async(bool async_);
+ inline JobTicket& keepJob(bool keep_);
+ inline JobTicket& waitTillSent(bool wait);
+ inline JobTicket& timeout(int timeout);
+
+ const std::string& commandName() const;
+ const std::string& getId() const;
+ const std::string& getMessageText() const;
+
+ void wait(unsigned int timeout_=0);
+ void waitTillReqSent();
+
+ const std::vector<Message::MessagePtr>& getResponse() const;
+ const std::string& toString();
+
+
+ friend class NodeThread;
+};
+
+}
+
+#endif
Added: trunk/apps/CppFCPLib/Log.cpp
===================================================================
--- trunk/apps/CppFCPLib/Log.cpp (rev 0)
+++ trunk/apps/CppFCPLib/Log.cpp 2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,44 @@
+
+#include "Log.h"
+#include "Exceptions.h"
+#include "zthread/Guard.h"
+
+using namespace FCPLib;
+
+Logger::Logger(ostream &out_, verbosityLevel logLevel_)
+ : out(out_),
+ logLevel(logLevel_)
+{
+}
+
+void Logger::log(verbosityLevel logLevel_, const char *message)
+{
+ ZThread::Guard<ZThread::Mutex> g(lock);
+
+ if (logLevel_ <= logLevel) {
+ out << message;
+ if (message[strlen(message)] != '\n')
+ out << '\n';
+ }
+ out.flush();
+}
+
+void Logger::log(verbosityLevel logLevel_, std::string message)
+{
+ log(logLevel_, message.c_str());
+}
+
+Logger&
+log(ostream &out_, verbosityLevel logLevel_)
+{
+ static bool firstTime = true;
+ static Logger* l;
+
+ if (firstTime) {
+ firstTime = false;
+ l = new Logger(out_, logLevel_);
+ }
+
+ return *l;
+}
+
Added: trunk/apps/CppFCPLib/Log.h
===================================================================
--- trunk/apps/CppFCPLib/Log.h (rev 0)
+++ trunk/apps/CppFCPLib/Log.h 2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,40 @@
+
+#ifndef LOG_H_
+#define LOG_H_
+
+
+#include "zthread/Thread.h"
+#include "zthread/Mutex.h"
+#include <iostream>
+#include <string>
+
+using namespace std;
+
+namespace FCPLib {
+
+typedef enum {
+ SILENT,
+ FATAL,
+ CRITICAL,
+ ERROR,
+ INFO,
+ DETAIL,
+ DEBUG,
+ NOISY,
+} verbosityLevel;
+
+class Logger{
+ ZThread::Mutex lock;
+ ostream &out;
+ verbosityLevel logLevel;
+public:
+ Logger(ostream &out_, verbosityLevel logLevel_=ERROR);
+ void log(verbosityLevel logLevel, const char *message);
+ void log(verbosityLevel logLevel, string message);
+};
+
+}
+
+FCPLib::Logger& log(ostream &out_=cerr, FCPLib::verbosityLevel
logLevel_=FCPLib::NOISY);
+
+#endif
Added: trunk/apps/CppFCPLib/Message.cpp
===================================================================
--- trunk/apps/CppFCPLib/Message.cpp (rev 0)
+++ trunk/apps/CppFCPLib/Message.cpp 2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,94 @@
+#include "Message.h"
+
+
+using namespace std;
+using namespace FCPLib;
+
+Message::Message() {
+ isDataType = false;
+ isReprValid = false;
+}
+
+Message::MessagePtr
+Message::factory(std::string &header){
+ Message::MessagePtr m( new Message() );
+
+ m->header = header;
+
+ return m;
+}
+
+Message::MessagePtr
+Message::factory(const char *header){
+ std::string hdr(header);
+
+ return factory(hdr);
+}
+
+Message::MessagePtr
+Message::factory(Server &s){
+ Message::MessagePtr m( new Message() );
+ static char line[1000];
+
+ s.readln(line, 1000);
+ line[strlen(line)-1] = 0;
+ m->header = string(line);
+
+ for (;;) {
+ s.readln(line, 1000);
+ line[strlen(line)-1] = 0;
+
+ if (!strcmp(line, "End") || !strcmp(line, "EndMessage"))
+ break;
+
+ char *val = strchr(line, '=');
+ *val++ = 0;
+ m->fields[string(line)] = string(val);
+ }
+
+ return m;
+}
+
+void
+Message::setField(std::string key, std::string value) {
+ // TODO: should i check if a message can contain certain field?
+ isReprValid = false;
+ fields[key] = value;
+}
+
+std::string
+Message::getField(const std::string &key) {
+ return fields[key];
+}
+
+
+const std::string&
+Message::toString() {
+ static char intToString[30];
+ if (isReprValid)
+ return repr;
+ repr = header + "\n";
+ for (map<string, string>::iterator it = fields.begin(); it != fields.end();
++it)
+ if (isDataType && it->first == "Data")
+ continue;
+ else
+ repr += it->first + "=" + it->second + "\n";
+ if (isDataType) {
+ repr += "DataLength=";
+ sprintf(intToString, "%d", fields["Data"].size());
+ repr += string(intToString);
+ repr += "\n";
+ repr += "Data\n";
+ repr += fields["Data"];
+ } else {
+ repr += "EndMessage\n";
+ }
+ isReprValid = true;
+ return repr;
+}
+
+const std::string&
+Message::getHeader() const
+{
+ return header;
+}
Added: trunk/apps/CppFCPLib/Message.h
===================================================================
--- trunk/apps/CppFCPLib/Message.h (rev 0)
+++ trunk/apps/CppFCPLib/Message.h 2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,40 @@
+
+
+#ifndef MESSAGE_H_
+#define MESSAGE_H_
+
+#include <string>
+#include <map>
+#include <boost/shared_ptr.hpp>
+#include "Server.h"
+
+namespace FCPLib {
+
+class Message {
+ std::string repr;
+ std::string header;
+
+ std::map<std::string, std::string> fields;
+
+ bool isDataType;
+ bool isReprValid;
+
+ Message();
+public:
+ typedef boost::shared_ptr<Message> MessagePtr;
+ static MessagePtr factory(std::string &header);
+ static MessagePtr factory(const char *header);
+ static MessagePtr factory(Server &s);
+
+ void setField(std::string key, std::string value);
+ inline std::string getField(const std::string &key);
+ const std::string& getHeader() const;
+
+ const std::string& toString();
+};
+
+}
+
+#endif
+
+
Added: trunk/apps/CppFCPLib/Node.cpp
===================================================================
--- trunk/apps/CppFCPLib/Node.cpp (rev 0)
+++ trunk/apps/CppFCPLib/Node.cpp 2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,44 @@
+
+#include "Node.h"
+#include "Message.h"
+#include "Log.h"
+
+using namespace FCPLib;
+
+std::string
+Node::_getUniqueId() {
+ char newid[100];
+ sprintf(newid, "id%d", (int) time(0));
+ return string(newid);
+}
+
+Node::Node(std::string name_, std::string host, int port)
+ : name(name_),
+ clientReqQueue( new TQueue<JobTicket::JobTicketPtr>() )
+{
+ if (!name.size())
+ name = Node::_getUniqueId();
+ log().log(DEBUG, "Node started name=" + name + "\n");
+
+ nodeThread = new NodeThread(host, port, clientReqQueue);
+ executor.execute( nodeThread );
+
+ Message::MessagePtr m = Message::factory("ClientHello");
+ m->setField("Name", name);
+ m->setField("ExpectedVersion", "2.0");
+
+ log().log(DEBUG, "Creating ClientHello\n");
+ JobTicket::JobTicketPtr job( new JobTicket("__hello", m) );
+ log().log(DEBUG, job->toString());
+ clientReqQueue->put(job);
+
+ log().log(DEBUG, "waiting for the NodeHello");
+ job->wait(0);
+ log().log(DEBUG, "NodeHello arrived");
+// cout << "Izlaz:\n" << job->getResponse()[0]->toString();
+}
+
+Node::~Node()
+{
+ executor.interrupt();
+}
Added: trunk/apps/CppFCPLib/Node.h
===================================================================
--- trunk/apps/CppFCPLib/Node.h (rev 0)
+++ trunk/apps/CppFCPLib/Node.h 2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,26 @@
+#ifndef NODE_H__
+#define NODE_H__
+
+#include <string>
+#include <memory>
+#include "zthread/Thread.h"
+#include "zthread/ThreadedExecutor.h"
+#include "TQueue.h"
+#include "NodeThread.h"
+
+namespace FCPLib {
+class Node {
+ std::string name;
+ ZThread::CountedPtr< JobTicketQueue > clientReqQueue;
+ NodeThread *nodeThread;
+ ZThread::ThreadedExecutor executor;
+
+ static std::string _getUniqueId();
+
+public:
+ Node(std::string name, std::string host, int port);
+ ~Node();
+};
+}
+
+#endif
Added: trunk/apps/CppFCPLib/NodeThread.cpp
===================================================================
--- trunk/apps/CppFCPLib/NodeThread.cpp (rev 0)
+++ trunk/apps/CppFCPLib/NodeThread.cpp 2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,114 @@
+
+
+#include "NodeThread.h"
+#include "Log.h"
+#include <ctime>
+
+using namespace FCPLib;
+using namespace ZThread;
+
+NodeThread::NodeThread(std::string &host,
+ int port,
+ ZThread::CountedPtr<TQueue<JobTicket::JobTicketPtr> >
&clientReqQueue_) throw()
+ : clientReqQueue(clientReqQueue_),
+ s(host, port),
+ jobs( new std::map<std::string, JobTicket::JobTicketPtr>() )
+{
+}
+
+void NodeThread::run(){
+ Message::MessagePtr m;
+ JobTicket::JobTicketPtr job;
+ log().log(DETAIL, "FCPNode: manager thread starting");
+ try {
+ while (!Thread::interrupted()) {
+ //check for incoming message from node
+ log().log(NOISY, "_mgrThread: Testing for incoming message");
+ if (s.dataAvailable()){
+ log().log(DEBUG, "_mgrThread: Retrieving incoming message");
+ m = Message::factory(s);
+ log().log(DEBUG, "_mgrThread: Got incoming message, dispatching");
+ // dispatch the message
+ doMessage(m);
+ }
+ //check for incoming message from client
+ if (!clientReqQueue->empty()){
+ log().log(DEBUG, "_mgrThread: Got incoming client req");
+ job = clientReqQueue->get();
+ log().log(DEBUG, "_mgrThread: Got incoming client req from the queue");
+ log().log(DEBUG, job->toString());
+ sendClientReq(job);
+ }
+ }
+ } catch (ZThread::Synchronization_Exception& e) {
+ // do some cleanup
+ }
+}
+
+void
+NodeThread::sendClientReq(JobTicket::JobTicketPtr &job)
+{
+ log().log(NOISY, "sendClientReq : top");
+ if (job->commandName() != "WatchGlobal") {
+ log().log(NOISY, "sendClientReq : about to add the job to the map");
+ (*jobs)[job->getId()] = job;
+ log().log(NOISY, "sendClientReq : added the job to the map");
+ }
+
+ s.send(job->getMessageText());
+ job->timeQueued = (unsigned int) time(0);
+ job->reqSentLock.release();
+}
+
+//
+//void NodeThread::_hello(){
+// string message;
+// message = Message::toString("ClientHello", 2, "Name",
this->name.c_str(), "ExpectedVersion", "2.0");
+// messageQSend->put(message);
+// logfile->log(DETAIL, message);
+//
+// message = this->_rxMsg();
+// Message* nodeHello = new Message(message);
+// const char * tmp;
+// if ((tmp = nodeHello->getField("FCPVersion")) != NULL){
+// this->nodeFCPVersion = string(tmp);
+// }
+// if ((tmp = nodeHello->getField("Version")) != NULL){
+// this->nodeVersion = string(tmp);
+// }
+// if ((tmp = nodeHello->getField("Testnet")) != NULL){
+// this->nodeIsTestnet = (string(tmp) == "true") ? true : false;
+// }
+// if ((tmp = nodeHello->getField("CompressionCodes")) != NULL){
+// this->compressionCodes = atoi(tmp);
+// }
+//
+// delete nodeHello;
+//}
+
+
+//
+//string NodeThread::_rxMsg(){
+// string message = messageQReceive->get();
+// logfile->log(DETAIL, message);
+// return message;
+//}
+//
+//void NodeThread::_on_rxMsg(std::string& message){
+// Message m(message);
+//
+// logfile->log(DEBUG, m.toString());
+//}
+
+void
+NodeThread::doMessage(Message::MessagePtr &message)
+{
+ JobTicket::JobTicketPtr job;
+ if (message->getHeader() == "NodeHello"){
+ job = jobs->find("__hello")->second;
+ job->nodeResponse.push_back(message);
+ job->putResult();
+ return;
+ }
+}
+
Added: trunk/apps/CppFCPLib/NodeThread.h
===================================================================
--- trunk/apps/CppFCPLib/NodeThread.h (rev 0)
+++ trunk/apps/CppFCPLib/NodeThread.h 2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,40 @@
+#ifndef NODETHREAD_H__
+#define NODETHREAD_H__
+
+#include "zthread/Thread.h"
+#include "TQueue.h"
+#include "Log.h"
+#include "Server.h"
+#include <map>
+#include <string>
+#include "JobTicket.h"
+
+namespace FCPLib {
+
+class Node;
+
+typedef TQueue<JobTicket::JobTicketPtr> JobTicketQueue;
+
+class NodeThread : public ZThread::Runnable {
+// ZThread::CountedPtr<Logger> logfile;
+ ZThread::CountedPtr< JobTicketQueue > clientReqQueue;
+ FCPLib::Server s;
+ ZThread::CountedPtr< std::map<std::string, JobTicket::JobTicketPtr> > jobs;
+
+// void _hello();
+// static std::string _getUniqueId();
+// std::string _rxMsg();
+// void _on_rxMsg(std::string &message);
+
+ friend class Node;
+ NodeThread(std::string &host, int port, ZThread::CountedPtr< JobTicketQueue
> &clientReqQueue_) throw();
+
+ void sendClientReq(JobTicket::JobTicketPtr &job);
+ void doMessage(Message::MessagePtr &message);
+public:
+ void run();
+};
+
+}
+#endif
+
Added: trunk/apps/CppFCPLib/Server.cpp
===================================================================
--- trunk/apps/CppFCPLib/Server.cpp (rev 0)
+++ trunk/apps/CppFCPLib/Server.cpp 2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,150 @@
+
+#include "Server.h"
+#include "Exceptions.h"
+#include "DefaultValues.h"
+#include <cstring>
+#include <string>
+#include <netdb.h>
+#include <sys/types.h>
+#include <fcntl.h>
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <sys/poll.h>
+
+#include "Log.h"
+
+using namespace FCPLib;
+
+
+Server::Server(std::string &host, int port){
+ /* Use local host as default address */
+ if (!host.size())
+ host = "127.0.0.1";
+
+ /* Use default port */
+ if (port<=0)
+ port = 9481;
+
+ struct hostent *he;
+ struct sockaddr_in addr;
+
+ /* Resolve hostname */
+ if ((he = gethostbyname(host.c_str())) == NULL)
+ throw StdError(__FUNCTION__, "Failed to resolve", strerror(errno));
+
+ /* Snag socket */
+ if ((sockfd = socket(PF_INET, SOCK_STREAM, 0))<0)
+ throw StdError(__FUNCTION__, "Failed to get socket", strerror(errno));
+
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(port);
+ addr.sin_addr = *((struct in_addr *)he->h_addr);
+ memset(&(addr.sin_zero), '\0', 8);
+
+ /* Connect */
+ if (connect(sockfd, (struct sockaddr *)&addr, sizeof(struct
sockaddr))<0) {
+ close(sockfd);
+ throw StdError(__FUNCTION__, "Failed to connect",
strerror(errno));
+ }
+}
+
+Server::~Server(){
+ close(sockfd);
+}
+
+ssize_t Server::readn(void *vptr, size_t n){
+ size_t nleft;
+ ssize_t nread;
+ char *ptr;
+
+ ptr = (char*) vptr;
+ nleft = n;
+ while (nleft > 0) {
+ if ((nread = read(sockfd, ptr, nleft)) < 0) {
+ if (errno == EINTR)
+ nread = 0;
+ else
+ throw StdError(__FUNCTION__, "", strerror(errno));
+ } else if (nread == 0) {
+ break; // EOF
+ }
+ nleft -= nread;
+ ptr += nread;
+ }
+
+ return (n - nleft);
+}
+
+ssize_t Server::writen(const void *vptr, size_t n){
+ size_t nleft;
+ ssize_t nwritten;
+ const char *ptr;
+
+ ptr = (const char*) vptr;
+ nleft = n;
+ while (nleft > 0) {
+ if ((nwritten = write(sockfd, ptr, nleft)) <= 0) {
+ if (errno == EINTR)
+ nwritten = 0;
+ else
+ throw StdError(__FUNCTION__, "", strerror(errno));
+ }
+ nleft -= nwritten;
+ ptr += nwritten;
+ }
+
+ return (n);
+}
+
+ssize_t Server::readln(void *vptr, size_t maxlen){
+ ssize_t n, rc;
+ char c, *ptr;
+
+ ptr = (char *)vptr;
+ for (n = 1; n < maxlen; n++) {
+ if ((rc = read(sockfd, &c, 1)) == 1) {
+ *ptr++ = c;
+ if (c == '\n')
+ break;
+ } else if (!rc) {
+ if (n == 1)
+ return 0;
+ else
+ break;
+ }
+ }
+ *ptr = 0;
+ return n;
+}
+
+void Server::send(const std::string &s){
+ log().log(DEBUG, "Sending:\n"+s+"-----------------\n");
+ writen(s.c_str(), s.length());
+}
+
+bool Server::dataAvailable(){
+ int pollret;
+ struct pollfd mypol;
+ mypol.fd = sockfd;
+ mypol.events = POLLERR | POLLHUP | POLLIN;
+
+ pollret = poll(&mypol, 1, pollTimeout);
+
+ if (pollret<0)
+ throw new StdError(__FUNCTION__, "poll error", strerror(errno));
+
+ if (!pollret)
+ return false;
+
+ /* if we get an error, die */
+ if (mypol.revents&POLLERR)
+ throw new StdError(__FUNCTION__, "poll error", strerror(errno));
+
+ if (mypol.revents&POLLHUP)
+ throw new StdError(__FUNCTION__, "poll hangup", strerror(errno));
+
+ if (mypol.revents&POLLIN)
+ return true;
+
+ return false;
+}
Added: trunk/apps/CppFCPLib/Server.h
===================================================================
--- trunk/apps/CppFCPLib/Server.h (rev 0)
+++ trunk/apps/CppFCPLib/Server.h 2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,25 @@
+
+#ifndef SERVER_H__
+#define SERVER_H__
+
+#include <string>
+
+namespace FCPLib {
+
+class Server {
+ friend class NodeThread;
+ int sockfd;
+ Server(std::string &host, int port=-1);
+public:
+ ~Server();
+ ssize_t readn(void *vptr, size_t n);
+ ssize_t readln(void *vptr, size_t maxlen);
+ ssize_t writen(const void *vptr, size_t n);
+ void send(const std::string &s);
+ bool dataAvailable();
+};
+
+}
+
+
+#endif
Added: trunk/apps/CppFCPLib/TQueue.h
===================================================================
--- trunk/apps/CppFCPLib/TQueue.h (rev 0)
+++ trunk/apps/CppFCPLib/TQueue.h 2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,38 @@
+#ifndef TQUEUE_H__
+#define TQUEUE_H__
+
+#include <deque>
+#include "zthread/Thread.h"
+#include "zthread/Condition.h"
+#include "zthread/Mutex.h"
+#include "zthread/Guard.h"
+
+namespace FCPLib {
+
+template<class T> class TQueue {
+ ZThread::Mutex lock;
+ ZThread::Condition cond;
+ std::deque<T> data;
+public:
+ TQueue() : cond(lock) {}
+ void put(T item) {
+ ZThread::Guard<ZThread::Mutex> g(lock);
+ data.push_back(item);
+ cond.signal();
+ }
+ T get() {
+ ZThread::Guard<ZThread::Mutex> g(lock);
+ while(data.empty())
+ cond.wait();
+ T returnVal = data.front();
+ data.pop_front();
+ return returnVal;
+ }
+ bool empty() {
+ ZThread::Guard<ZThread::Mutex> g(lock);
+ return data.empty();
+ }
+};
+
+}
+#endif
Added: trunk/apps/CppFCPLib/main.cpp
===================================================================
--- trunk/apps/CppFCPLib/main.cpp (rev 0)
+++ trunk/apps/CppFCPLib/main.cpp 2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,22 @@
+#include <iostream>
+#include <string>
+#include "Server.h"
+#include "Node.h"
+
+using namespace std;
+using namespace FCPLib;
+
+int main()
+{
+// char line[100];
+// Server s;
+// s.send("ClientHello\nName=123\nExpectedVersion=2.0\nEndMessage\n");
+//
+// for(;;){
+// s.readln(line, 100);
+// cout << line;
+// }
+
+ Node("123", "", -1);
+ return 0;
+}