Author: mkolar
Date: 2007-06-01 16:56:01 +0000 (Fri, 01 Jun 2007)
New Revision: 13446

Added:
   trunk/apps/CppFCPLib/
   trunk/apps/CppFCPLib/DefaultValues.h
   trunk/apps/CppFCPLib/Exceptions.cpp
   trunk/apps/CppFCPLib/Exceptions.h
   trunk/apps/CppFCPLib/JobTicket.cpp
   trunk/apps/CppFCPLib/JobTicket.h
   trunk/apps/CppFCPLib/Log.cpp
   trunk/apps/CppFCPLib/Log.h
   trunk/apps/CppFCPLib/Message.cpp
   trunk/apps/CppFCPLib/Message.h
   trunk/apps/CppFCPLib/Node.cpp
   trunk/apps/CppFCPLib/Node.h
   trunk/apps/CppFCPLib/NodeThread.cpp
   trunk/apps/CppFCPLib/NodeThread.h
   trunk/apps/CppFCPLib/Server.cpp
   trunk/apps/CppFCPLib/Server.h
   trunk/apps/CppFCPLib/TQueue.h
   trunk/apps/CppFCPLib/main.cpp
Log:
Initial import of CppFCPLib to the repository

Added: trunk/apps/CppFCPLib/DefaultValues.h
===================================================================
--- trunk/apps/CppFCPLib/DefaultValues.h                                (rev 0)
+++ trunk/apps/CppFCPLib/DefaultValues.h        2007-06-01 16:56:01 UTC (rev 
13446)
@@ -0,0 +1,19 @@
+
+#ifndef DEFAULTVALUES_H_
+#define DEFAULTVALUES_H_
+
+#include <string>
+//#include "Log.h"
+
+const std::string defaultFCPHost = "127.0.0.1";
+const int defaultFCPPort = 9481;
+const std::string defaultFProxyHost = "127.0.0.1";
+const int defaultFProxyPort = 8888;
+//const FCPLib::verbosityLevel defaultVerbosity = FCPLib::ERROR;
+const int inputBufferSize = 64 * 1024;
+const int outputBufferSize = 64 * 1024;
+const int maxSizeMessage = 8192;
+const int pollTimeout = 100;
+const int oneyear = 86400 * 365 * 1000;
+
+#endif

Added: trunk/apps/CppFCPLib/Exceptions.cpp
===================================================================
--- trunk/apps/CppFCPLib/Exceptions.cpp                         (rev 0)
+++ trunk/apps/CppFCPLib/Exceptions.cpp 2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,31 @@
+#include "Exceptions.h"
+
+using namespace FCPLib;
+
+StdError::StdError() :
+       std::runtime_error(strerror(errno)),
+       error(errno)
+{
+}
+
+StdError::StdError(int error_) :
+       std::runtime_error(strerror(error_)),
+       error(error_)
+{
+}
+
+StdError::StdError(std::string &func, std::string &message, std::string 
&errstring) :
+  std::runtime_error(func + " : " + message + " " + errstring),
+  error(errno)
+{
+}
+
+StdError::StdError(const char *func, const char *message, const char 
*errstring) :
+  std::runtime_error(std::string(func) + " : " + std::string(message) + " " + 
std::string(errstring)),
+  error(errno)
+{
+}
+
+StdError::~StdError() throw()
+{
+}

Added: trunk/apps/CppFCPLib/Exceptions.h
===================================================================
--- trunk/apps/CppFCPLib/Exceptions.h                           (rev 0)
+++ trunk/apps/CppFCPLib/Exceptions.h   2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,22 @@
+
+#ifndef EXCEPTIONS_H__
+#define EXCEPTIONS_H__
+
+#include <stdexcept>
+#include <cerrno>
+
+namespace FCPLib {
+
+class StdError : public std::runtime_error {
+  int error;
+public:
+  StdError();
+  StdError(int error_);
+  StdError(std::string &func, std::string &message, std::string &errstring);
+  StdError(const char *func, const char *message, const char *errstring);
+  ~StdError() throw();
+};
+
+}
+
+#endif

Added: trunk/apps/CppFCPLib/JobTicket.cpp
===================================================================
--- trunk/apps/CppFCPLib/JobTicket.cpp                          (rev 0)
+++ trunk/apps/CppFCPLib/JobTicket.cpp  2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,125 @@
+
+#include "JobTicket.h"
+#include "DefaultValues.h"
+#include "zthread/Thread.h"
+#include <boost/lexical_cast.hpp>
+
+using namespace FCPLib;
+
+JobTicket::JobTicket(std::string id_, Message::MessagePtr &cmd_)
+  : id(id_),
+    cmd(cmd_),
+    async_(false),
+    keep(false),
+    waitTillSent_(false),
+    timeout_(oneyear),
+    isReprValid(false)
+//    hasStream(false)
+{
+  lock.acquire();
+  reqSentLock.acquire();
+}
+
+JobTicket&
+JobTicket::async(bool async)
+{
+  isReprValid = false;
+  async_ = async; return *this;
+}
+
+inline JobTicket&
+JobTicket::keepJob(bool keep_)
+{
+  isReprValid = false;
+  keep = keep_; return *this;
+}
+inline  JobTicket&
+JobTicket::waitTillSent(bool wait_)
+{
+  isReprValid = false;
+  waitTillSent_ = wait_; return *this;
+}
+inline  JobTicket&
+JobTicket::timeout(int timeout)
+{
+  isReprValid = false;
+  timeout_ = timeout; return *this;
+}
+
+const std::string&
+JobTicket::getId() const
+{
+  return id;
+}
+
+const std::string&
+JobTicket::commandName() const
+{
+  return cmd->getHeader();
+}
+
+const std::string&
+JobTicket::getMessageText() const
+{
+  return cmd->toString();
+}
+
+void
+JobTicket::wait(unsigned int timeout)
+{
+  if (!timeout){
+    while (!lock.tryAcquire(100))
+      ZThread::Thread::sleep(100);
+    lock.release();
+    return;
+  }
+
+  unsigned int then = (unsigned int) time(0);
+  unsigned int elapsed;
+  while (!reqSentLock.tryAcquire(100)){
+    elapsed = (unsigned int) time(0) - then;
+    if (elapsed < timeout){
+                   Thread::sleep(1000)
+      log().log(DEBUG, "wait:"+job->commandName()+":"+job->getId()+": job not 
dispatched, timeout in " +
+        boost::lexical_cast<std::string>(timeout-elapsed));
+      continue;
+    }
+  }
+}
+
+void
+JobTicket::waitTillReqSent()
+{
+  reqSentLock.acquire();
+}
+
+void
+JobTicket::putResult()
+{
+  lock.release();
+}
+
+const std::vector<Message::MessagePtr>&
+JobTicket::getResponse() const
+{
+  return nodeResponse;
+}
+
+const std::string&
+JobTicket::toString()
+{
+  if (isReprValid)
+    return repr;
+
+  repr = "";
+  isReprValid = true;
+
+  repr += "Job id=" + id + "\n";
+  repr += getMessageText();
+  repr += "async=" + boost::lexical_cast<std::string>(async_) + "\n";
+  repr += "keepJob=" + boost::lexical_cast<std::string>(keep) + "\n";
+  repr += "waitTillSent=" + boost::lexical_cast<std::string>(waitTillSent_) + 
"\n";
+  repr += "timeout=" + boost::lexical_cast<std::string>(timeout_) + "\n";
+
+  return repr;
+}

Added: trunk/apps/CppFCPLib/JobTicket.h
===================================================================
--- trunk/apps/CppFCPLib/JobTicket.h                            (rev 0)
+++ trunk/apps/CppFCPLib/JobTicket.h    2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,60 @@
+#ifndef JOBTICKET_H__
+#define JOBTICKET_H__
+
+#include "Log.h"
+#include "Message.h"
+#include <vector>
+#include <boost/shared_ptr.hpp>
+#include <iostream>
+#include "zthread/FastMutex.h"
+
+namespace FCPLib {
+
+class NodeThread;
+
+class JobTicket {
+  std::string id;
+  Message::MessagePtr cmd;
+  std::vector<Message::MessagePtr> nodeResponse;
+  bool async_;
+  bool keep;
+  bool waitTillSent_;
+  int timeout_;
+
+  std::string repr;
+  bool isReprValid;
+
+//  bool hasStream;
+//  ostream stream;
+
+  ZThread::FastMutex lock;
+  ZThread::FastMutex reqSentLock;
+  int timeQueued;
+
+  void putResult();
+public:
+  typedef boost::shared_ptr<JobTicket> JobTicketPtr;
+
+  JobTicket(std::string id_, Message::MessagePtr &cmd_);
+  inline JobTicket& async(bool async_);
+  inline JobTicket& keepJob(bool keep_);
+  inline JobTicket& waitTillSent(bool wait);
+  inline JobTicket& timeout(int timeout);
+
+  const std::string& commandName() const;
+  const std::string& getId() const;
+  const std::string& getMessageText() const;
+
+  void wait(unsigned int timeout_=0);
+  void waitTillReqSent();
+
+  const std::vector<Message::MessagePtr>& getResponse() const;
+  const std::string& toString();
+
+
+  friend class NodeThread;
+};
+
+}
+
+#endif

Added: trunk/apps/CppFCPLib/Log.cpp
===================================================================
--- trunk/apps/CppFCPLib/Log.cpp                                (rev 0)
+++ trunk/apps/CppFCPLib/Log.cpp        2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,44 @@
+
+#include "Log.h"
+#include "Exceptions.h"
+#include "zthread/Guard.h"
+
+using namespace FCPLib;
+
+Logger::Logger(ostream &out_, verbosityLevel logLevel_)
+  : out(out_),
+    logLevel(logLevel_)
+{
+}
+
+void Logger::log(verbosityLevel logLevel_, const char *message)
+{
+  ZThread::Guard<ZThread::Mutex> g(lock);
+
+  if (logLevel_ <= logLevel) {
+      out << message;
+      if (message[strlen(message)] != '\n')
+        out << '\n';
+  }
+  out.flush();
+}
+
+void Logger::log(verbosityLevel logLevel_, std::string message)
+{
+  log(logLevel_, message.c_str());
+}
+
+Logger&
+log(ostream &out_, verbosityLevel logLevel_)
+{
+  static bool firstTime = true;
+  static Logger* l;
+
+  if (firstTime) {
+    firstTime = false;
+    l = new Logger(out_, logLevel_);
+  }
+
+  return *l;
+}
+

Added: trunk/apps/CppFCPLib/Log.h
===================================================================
--- trunk/apps/CppFCPLib/Log.h                          (rev 0)
+++ trunk/apps/CppFCPLib/Log.h  2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,40 @@
+
+#ifndef LOG_H_
+#define LOG_H_
+
+
+#include "zthread/Thread.h"
+#include "zthread/Mutex.h"
+#include <iostream>
+#include <string>
+
+using namespace std;
+
+namespace FCPLib {
+
+typedef enum {
+    SILENT,
+    FATAL,
+    CRITICAL,
+    ERROR,
+    INFO,
+    DETAIL,
+    DEBUG,
+    NOISY,
+} verbosityLevel;
+
+class Logger{
+  ZThread::Mutex lock;
+  ostream &out;
+  verbosityLevel logLevel;
+public:
+    Logger(ostream &out_, verbosityLevel logLevel_=ERROR);
+    void log(verbosityLevel logLevel, const char *message);
+    void log(verbosityLevel logLevel, string message);
+};
+
+}
+
+FCPLib::Logger& log(ostream &out_=cerr, FCPLib::verbosityLevel 
logLevel_=FCPLib::NOISY);
+
+#endif

Added: trunk/apps/CppFCPLib/Message.cpp
===================================================================
--- trunk/apps/CppFCPLib/Message.cpp                            (rev 0)
+++ trunk/apps/CppFCPLib/Message.cpp    2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,94 @@
+#include "Message.h"
+
+
+using namespace std;
+using namespace FCPLib;
+
+Message::Message() {
+  isDataType = false;
+  isReprValid = false;
+}
+
+Message::MessagePtr
+Message::factory(std::string &header){
+  Message::MessagePtr m( new Message() );
+
+  m->header = header;
+
+  return m;
+}
+
+Message::MessagePtr
+Message::factory(const char *header){
+  std::string hdr(header);
+
+  return factory(hdr);
+}
+
+Message::MessagePtr
+Message::factory(Server &s){
+  Message::MessagePtr m( new Message() );
+  static char line[1000];
+
+  s.readln(line, 1000);
+  line[strlen(line)-1] = 0;
+  m->header = string(line);
+
+  for (;;) {
+    s.readln(line, 1000);
+    line[strlen(line)-1] = 0;
+
+    if (!strcmp(line, "End") || !strcmp(line, "EndMessage"))
+      break;
+
+    char *val = strchr(line, '=');
+    *val++ = 0;
+    m->fields[string(line)] = string(val);
+  }
+
+  return m;
+}
+
+void
+Message::setField(std::string key, std::string value) {
+  // TODO: should i check if a message can contain certain field?
+  isReprValid = false;
+  fields[key] = value;
+}
+
+std::string
+Message::getField(const std::string &key) {
+  return fields[key];
+}
+
+
+const std::string&
+Message::toString() {
+  static char intToString[30];
+  if (isReprValid)
+    return repr;
+  repr = header + "\n";
+  for (map<string, string>::iterator it = fields.begin(); it != fields.end(); 
++it)
+    if (isDataType && it->first == "Data")
+      continue;
+    else
+      repr += it->first + "=" + it->second + "\n";
+  if (isDataType) {
+    repr += "DataLength=";
+    sprintf(intToString, "%d", fields["Data"].size());
+    repr += string(intToString);
+    repr += "\n";
+    repr += "Data\n";
+    repr += fields["Data"];
+  } else {
+    repr += "EndMessage\n";
+  }
+  isReprValid = true;
+  return repr;
+}
+
+const std::string&
+Message::getHeader() const
+{
+  return header;
+}

Added: trunk/apps/CppFCPLib/Message.h
===================================================================
--- trunk/apps/CppFCPLib/Message.h                              (rev 0)
+++ trunk/apps/CppFCPLib/Message.h      2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,40 @@
+
+
+#ifndef MESSAGE_H_
+#define MESSAGE_H_
+
+#include <string>
+#include <map>
+#include <boost/shared_ptr.hpp>
+#include "Server.h"
+
+namespace FCPLib {
+
+class Message {
+  std::string repr;
+  std::string header;
+
+  std::map<std::string, std::string> fields;
+
+  bool isDataType;
+  bool isReprValid;
+
+  Message();
+public:
+  typedef boost::shared_ptr<Message> MessagePtr;
+  static MessagePtr factory(std::string &header);
+  static MessagePtr factory(const char *header);
+  static MessagePtr factory(Server &s);
+
+  void setField(std::string key, std::string value);
+  inline std::string getField(const std::string &key);
+  const std::string& getHeader() const;
+
+  const std::string& toString();
+};
+
+}
+
+#endif
+
+

Added: trunk/apps/CppFCPLib/Node.cpp
===================================================================
--- trunk/apps/CppFCPLib/Node.cpp                               (rev 0)
+++ trunk/apps/CppFCPLib/Node.cpp       2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,44 @@
+
+#include "Node.h"
+#include "Message.h"
+#include "Log.h"
+
+using namespace FCPLib;
+
+std::string
+Node::_getUniqueId() {
+    char newid[100];
+    sprintf(newid, "id%d", (int) time(0));
+    return string(newid);
+}
+
+Node::Node(std::string name_, std::string host, int port)
+  : name(name_),
+    clientReqQueue( new TQueue<JobTicket::JobTicketPtr>() )
+{
+  if (!name.size())
+    name = Node::_getUniqueId();
+  log().log(DEBUG, "Node started name=" + name + "\n");
+
+  nodeThread = new NodeThread(host, port, clientReqQueue);
+  executor.execute( nodeThread );
+
+  Message::MessagePtr m = Message::factory("ClientHello");
+  m->setField("Name", name);
+  m->setField("ExpectedVersion", "2.0");
+
+  log().log(DEBUG, "Creating ClientHello\n");
+  JobTicket::JobTicketPtr job( new JobTicket("__hello", m) );
+  log().log(DEBUG, job->toString());
+  clientReqQueue->put(job);
+
+  log().log(DEBUG, "waiting for the NodeHello");
+  job->wait(0);
+  log().log(DEBUG, "NodeHello arrived");
+//  cout << "Izlaz:\n" << job->getResponse()[0]->toString();
+}
+
+Node::~Node()
+{
+  executor.interrupt();
+}

Added: trunk/apps/CppFCPLib/Node.h
===================================================================
--- trunk/apps/CppFCPLib/Node.h                         (rev 0)
+++ trunk/apps/CppFCPLib/Node.h 2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,26 @@
+#ifndef NODE_H__
+#define NODE_H__
+
+#include <string>
+#include <memory>
+#include "zthread/Thread.h"
+#include "zthread/ThreadedExecutor.h"
+#include "TQueue.h"
+#include "NodeThread.h"
+
+namespace FCPLib {
+class Node {
+  std::string name;
+  ZThread::CountedPtr< JobTicketQueue > clientReqQueue;
+  NodeThread *nodeThread;
+  ZThread::ThreadedExecutor executor;
+
+  static std::string _getUniqueId();
+
+public:
+  Node(std::string name, std::string host, int port);
+  ~Node();
+};
+}
+
+#endif

Added: trunk/apps/CppFCPLib/NodeThread.cpp
===================================================================
--- trunk/apps/CppFCPLib/NodeThread.cpp                         (rev 0)
+++ trunk/apps/CppFCPLib/NodeThread.cpp 2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,114 @@
+
+
+#include "NodeThread.h"
+#include "Log.h"
+#include <ctime>
+
+using namespace FCPLib;
+using namespace ZThread;
+
+NodeThread::NodeThread(std::string &host,
+                       int port,
+                       ZThread::CountedPtr<TQueue<JobTicket::JobTicketPtr> > 
&clientReqQueue_) throw()
+  : clientReqQueue(clientReqQueue_),
+    s(host, port),
+    jobs( new std::map<std::string, JobTicket::JobTicketPtr>() )
+{
+}
+
+void NodeThread::run(){
+  Message::MessagePtr m;
+  JobTicket::JobTicketPtr job;
+  log().log(DETAIL, "FCPNode: manager thread starting");
+  try {
+    while (!Thread::interrupted()) {
+      //check for incoming message from node
+      log().log(NOISY, "_mgrThread: Testing for incoming message");
+      if (s.dataAvailable()){
+        log().log(DEBUG, "_mgrThread: Retrieving incoming message");
+        m = Message::factory(s);
+        log().log(DEBUG, "_mgrThread: Got incoming message, dispatching");
+        // dispatch the message
+        doMessage(m);
+      }
+      //check for incoming message from client
+      if (!clientReqQueue->empty()){
+        log().log(DEBUG, "_mgrThread: Got incoming client req");
+        job = clientReqQueue->get();
+        log().log(DEBUG, "_mgrThread: Got incoming client req from the queue");
+        log().log(DEBUG, job->toString());
+        sendClientReq(job);
+      }
+    }
+  } catch (ZThread::Synchronization_Exception& e) {
+    // do some cleanup
+  }
+}
+
+void
+NodeThread::sendClientReq(JobTicket::JobTicketPtr &job)
+{
+  log().log(NOISY, "sendClientReq : top");
+  if (job->commandName() != "WatchGlobal") {
+    log().log(NOISY, "sendClientReq : about to add the job to the map");
+    (*jobs)[job->getId()] = job;
+    log().log(NOISY, "sendClientReq : added the job to the map");
+  }
+
+  s.send(job->getMessageText());
+  job->timeQueued = (unsigned int) time(0);
+  job->reqSentLock.release();
+}
+
+//
+//void NodeThread::_hello(){
+//    string message;
+//    message = Message::toString("ClientHello", 2, "Name", 
this->name.c_str(), "ExpectedVersion", "2.0");
+//    messageQSend->put(message);
+//    logfile->log(DETAIL, message);
+//
+//    message = this->_rxMsg();
+//    Message* nodeHello = new Message(message);
+//    const char * tmp;
+//    if ((tmp = nodeHello->getField("FCPVersion")) != NULL){
+//        this->nodeFCPVersion = string(tmp);
+//    }
+//    if ((tmp = nodeHello->getField("Version")) != NULL){
+//        this->nodeVersion = string(tmp);
+//    }
+//    if ((tmp = nodeHello->getField("Testnet")) != NULL){
+//        this->nodeIsTestnet = (string(tmp) == "true") ? true : false;
+//    }
+//    if ((tmp = nodeHello->getField("CompressionCodes")) != NULL){
+//        this->compressionCodes = atoi(tmp);
+//    }
+//
+//    delete nodeHello;
+//}
+
+
+//
+//string NodeThread::_rxMsg(){
+//  string message = messageQReceive->get();
+//  logfile->log(DETAIL, message);
+//  return message;
+//}
+//
+//void NodeThread::_on_rxMsg(std::string& message){
+//  Message m(message);
+//
+//  logfile->log(DEBUG, m.toString());
+//}
+
+void
+NodeThread::doMessage(Message::MessagePtr &message)
+{
+  JobTicket::JobTicketPtr job;
+  if (message->getHeader() == "NodeHello"){
+    job = jobs->find("__hello")->second;
+    job->nodeResponse.push_back(message);
+    job->putResult();
+    return;
+  }
+}
+

Added: trunk/apps/CppFCPLib/NodeThread.h
===================================================================
--- trunk/apps/CppFCPLib/NodeThread.h                           (rev 0)
+++ trunk/apps/CppFCPLib/NodeThread.h   2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,40 @@
+#ifndef NODETHREAD_H__
+#define NODETHREAD_H__
+
+#include "zthread/Thread.h"
+#include "TQueue.h"
+#include "Log.h"
+#include "Server.h"
+#include <map>
+#include <string>
+#include "JobTicket.h"
+
+namespace FCPLib {
+
+class Node;
+
+typedef TQueue<JobTicket::JobTicketPtr> JobTicketQueue;
+
+class NodeThread : public ZThread::Runnable {
+//  ZThread::CountedPtr<Logger> logfile;
+  ZThread::CountedPtr< JobTicketQueue > clientReqQueue;
+  FCPLib::Server s;
+  ZThread::CountedPtr< std::map<std::string, JobTicket::JobTicketPtr> > jobs;
+
+//  void _hello();
+//  static std::string _getUniqueId();
+//  std::string _rxMsg();
+//  void _on_rxMsg(std::string &message);
+
+  friend class Node;
+  NodeThread(std::string &host, int port, ZThread::CountedPtr< JobTicketQueue 
> &clientReqQueue_) throw();
+
+  void sendClientReq(JobTicket::JobTicketPtr &job);
+  void doMessage(Message::MessagePtr &message);
+public:
+  void run();
+};
+
+}
+#endif
+

Added: trunk/apps/CppFCPLib/Server.cpp
===================================================================
--- trunk/apps/CppFCPLib/Server.cpp                             (rev 0)
+++ trunk/apps/CppFCPLib/Server.cpp     2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,150 @@
+
+#include "Server.h"
+#include "Exceptions.h"
+#include "DefaultValues.h"
+#include <cstring>
+#include <string>
+#include <netdb.h>
+#include <sys/types.h>
+#include <fcntl.h>
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <sys/poll.h>
+
+#include "Log.h"
+
+using namespace FCPLib;
+
+
+Server::Server(std::string &host, int port){
+       /* Use local host as default address */
+       if (!host.size())
+               host = "127.0.0.1";
+
+       /* Use default port */
+       if (port<=0)
+         port = 9481;
+
+       struct hostent *he;
+       struct sockaddr_in addr;
+
+  /* Resolve hostname */
+       if ((he = gethostbyname(host.c_str())) == NULL)
+    throw StdError(__FUNCTION__, "Failed to resolve", strerror(errno));
+
+       /* Snag socket */
+       if ((sockfd = socket(PF_INET, SOCK_STREAM, 0))<0)
+    throw StdError(__FUNCTION__, "Failed to get socket", strerror(errno));
+
+       addr.sin_family = AF_INET;
+       addr.sin_port   = htons(port);
+       addr.sin_addr   = *((struct in_addr *)he->h_addr);
+       memset(&(addr.sin_zero), '\0', 8);
+
+       /* Connect */
+       if (connect(sockfd, (struct sockaddr *)&addr, sizeof(struct 
sockaddr))<0)       {
+               close(sockfd);
+               throw StdError(__FUNCTION__, "Failed to connect", 
strerror(errno));
+       }
+}
+
+Server::~Server(){
+  close(sockfd);
+}
+
+ssize_t Server::readn(void *vptr, size_t n){
+  size_t nleft;
+  ssize_t nread;
+  char *ptr;
+
+  ptr = (char*) vptr;
+  nleft = n;
+  while (nleft > 0) {
+    if ((nread = read(sockfd, ptr, nleft)) < 0) {
+      if (errno == EINTR)
+        nread = 0;
+      else
+        throw StdError(__FUNCTION__, "", strerror(errno));
+    } else if (nread == 0) {
+      break;   // EOF
+    }
+    nleft -= nread;
+    ptr += nread;
+  }
+
+  return (n - nleft);
+}
+
+ssize_t Server::writen(const void *vptr, size_t n){
+  size_t nleft;
+  ssize_t nwritten;
+  const char *ptr;
+
+  ptr = (const char*) vptr;
+  nleft = n;
+  while (nleft > 0) {
+    if ((nwritten = write(sockfd, ptr, nleft)) <= 0) {
+      if (errno == EINTR)
+        nwritten = 0;
+      else
+        throw StdError(__FUNCTION__, "", strerror(errno));
+    }
+    nleft -= nwritten;
+    ptr += nwritten;
+  }
+
+  return (n);
+}
+
+ssize_t Server::readln(void *vptr, size_t maxlen){
+  ssize_t n, rc;
+  char c, *ptr;
+
+  ptr = (char *)vptr;
+  for (n = 1; n < maxlen; n++) {
+    if ((rc = read(sockfd, &c, 1)) == 1) {
+      *ptr++ = c;
+      if (c == '\n')
+        break;
+    } else if (!rc) {
+      if (n == 1)
+        return 0;
+      else
+        break;
+    }
+  }
+  *ptr = 0;
+  return n;
+}
+
+void Server::send(const std::string &s){
+  log().log(DEBUG, "Sending:\n"+s+"-----------------\n");
+  writen(s.c_str(), s.length());
+}
+
+bool Server::dataAvailable(){
+  int pollret;
+  struct pollfd mypol;
+       mypol.fd = sockfd;
+  mypol.events = POLLERR | POLLHUP | POLLIN;
+
+  pollret = poll(&mypol, 1, pollTimeout);
+
+  if (pollret<0)
+    throw new StdError(__FUNCTION__, "poll error", strerror(errno));
+
+  if (!pollret)
+    return false;
+
+  /* if we get an error, die */
+       if (mypol.revents&POLLERR)
+         throw new StdError(__FUNCTION__, "poll error", strerror(errno));
+
+       if (mypol.revents&POLLHUP)
+         throw new StdError(__FUNCTION__, "poll hangup", strerror(errno));
+
+  if (mypol.revents&POLLIN)
+    return true;
+
+  return false;
+}

Added: trunk/apps/CppFCPLib/Server.h
===================================================================
--- trunk/apps/CppFCPLib/Server.h                               (rev 0)
+++ trunk/apps/CppFCPLib/Server.h       2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,25 @@
+
+#ifndef SERVER_H__
+#define SERVER_H__
+
+#include <string>
+
+namespace FCPLib {
+
+class Server {
+  friend class NodeThread;
+  int sockfd;
+  Server(std::string &host, int port=-1);
+public:
+  ~Server();
+  ssize_t readn(void *vptr, size_t n);
+  ssize_t readln(void *vptr, size_t maxlen);
+  ssize_t writen(const void *vptr, size_t n);
+  void send(const std::string &s);
+  bool dataAvailable();
+};
+
+}
+
+
+#endif

Added: trunk/apps/CppFCPLib/TQueue.h
===================================================================
--- trunk/apps/CppFCPLib/TQueue.h                               (rev 0)
+++ trunk/apps/CppFCPLib/TQueue.h       2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,38 @@
+#ifndef TQUEUE_H__
+#define TQUEUE_H__
+
+#include <deque>
+#include "zthread/Thread.h"
+#include "zthread/Condition.h"
+#include "zthread/Mutex.h"
+#include "zthread/Guard.h"
+
+namespace FCPLib {
+
+template<class T> class TQueue {
+  ZThread::Mutex lock;
+  ZThread::Condition cond;
+  std::deque<T> data;
+public:
+  TQueue() : cond(lock) {}
+  void put(T item) {
+    ZThread::Guard<ZThread::Mutex> g(lock);
+    data.push_back(item);
+    cond.signal();
+  }
+  T get() {
+    ZThread::Guard<ZThread::Mutex> g(lock);
+    while(data.empty())
+      cond.wait();
+    T returnVal = data.front();
+    data.pop_front();
+    return returnVal;
+  }
+  bool empty() {
+    ZThread::Guard<ZThread::Mutex> g(lock);
+    return data.empty();
+  }
+};
+
+}
+#endif

Added: trunk/apps/CppFCPLib/main.cpp
===================================================================
--- trunk/apps/CppFCPLib/main.cpp                               (rev 0)
+++ trunk/apps/CppFCPLib/main.cpp       2007-06-01 16:56:01 UTC (rev 13446)
@@ -0,0 +1,22 @@
+#include <iostream>
+#include <string>
+#include "Server.h"
+#include "Node.h"
+
+using namespace std;
+using namespace FCPLib;
+
+int main()
+{
+//  char line[100];
+//     Server s;
+//     s.send("ClientHello\nName=123\nExpectedVersion=2.0\nEndMessage\n");
+//
+//     for(;;){
+//       s.readln(line, 100);
+//       cout << line;
+//     }
+
+  Node("123", "", -1);
+  return 0;
+}


Reply via email to