Modified: trafficserver/traffic/branches/UserFiber/core/src/UFServer.C URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFServer.C?rev=985334&r1=985333&r2=985334&view=diff ============================================================================== --- trafficserver/traffic/branches/UserFiber/core/src/UFServer.C (original) +++ trafficserver/traffic/branches/UserFiber/core/src/UFServer.C Fri Aug 13 19:43:09 2010 @@ -1,13 +1,16 @@ #include <iostream> #include <errno.h> #include <string.h> -#include "UFServer.H" -#include "UFServer.H" - #include <sys/types.h> #include <sys/wait.h> -#include "UFStatSystem.H" -#include "UFStats.H" +#include <deque> +#include <list> + +#include <UF.H> +#include <UFStatSystem.H> +#include <UFStats.H> +#include <UFServer.H> +#include <UFConf.H> using namespace std; @@ -15,31 +18,15 @@ using namespace std; //TODO: create monitoring port later // // -static string getPrintableTime() -{ - char asctimeDate[32]; - asctimeDate[0] = '\0'; - time_t now = time(0); - asctime_r(localtime(&now), asctimeDate); - - string response = asctimeDate; - size_t loc = response.find('\n'); - if(loc != string::npos) - response.replace(loc, 1, ""); - return response; -} - void UFServer::reset() { _addressToBindTo = "0"; - _listenFd = -1; - _port = 0; + _listenSockets.clear(); _creationTime = 0; MAX_THREADS_ALLOWED = 8; MAX_PROCESSES_ALLOWED = 1; MAX_ACCEPT_THREADS_ALLOWED = 1; - UF_STACK_SIZE = 8192; _threadChooser = 0; } @@ -57,9 +44,13 @@ struct NewConnUF : public UF return; UFIOAcceptArgs* fiberStartingArgs = (UFIOAcceptArgs*) _startingArgs; - ((UFServer*) fiberStartingArgs->args)->handleNewConnection(fiberStartingArgs->ufio); + // increment connections handled stat UFStatSystem::increment(UFStats::connectionsHandled); + // Keep track of current connections + UFStatSystem::increment(UFStats::currentConnections); + ((UFServer*) fiberStartingArgs->args)->handleNewConnection(fiberStartingArgs->ufio); + UFStatSystem::increment(UFStats::currentConnections, -1); //clear the client connection delete fiberStartingArgs->ufio; @@ -89,21 +80,22 @@ struct AcceptRunner : public UF //add the scheduler for this UFIO* ufio = new UFIO(UFScheduler::getUF()); - int fd = ufserver->getListenFd(); - if(fd == -1) - fd = UFIO::setupConnectionToAccept(ufserver->getBindingInterface(), ufserver->getPort() /*, deal w/ backlog*/); - if(fd < 0) + if (socket.fd == -1) + { + socket.fd = UFIO::setupConnectionToAccept(ufserver->getBindingInterface(), socket.port /*, deal w/ backlog*/); + } + if (socket.fd < 0) { cerr<<getPrintableTime()<<" "<<getpid()<<":couldnt setup listen socket"<<endl; exit(1); } - if(!ufio || !ufio->setFd(fd, false/*has already been made non-blocking*/)) + if(!ufio || !ufio->setFd(socket.fd, false/*has already been made non-blocking*/)) { cerr<<getPrintableTime()<<" "<<getpid()<<":couldnt setup accept thread"<<endl; return; } - ufio->accept(ufserver->_threadChooser, NewConnUF::_myLoc, ufserver, 0, 0); + ufio->accept(ufserver->_threadChooser, NewConnUF::_myLoc, socket.port, ufserver, 0, 0); } AcceptRunner(bool registerMe = false) { @@ -113,10 +105,39 @@ struct AcceptRunner : public UF UF* createUF() { return new AcceptRunner(); } static AcceptRunner* _self; static int _myLoc; + UFServer::ListenSocket socket; }; int AcceptRunner::_myLoc = -1; AcceptRunner* AcceptRunner::_self = new AcceptRunner(true); +struct PerThreadInitializer : public UF +{ + void run() + { + if(!_startingArgs) + return; +// Add conf manager for thread + UFConfManager *confManager = new UFConfManager; + int ret = pthread_setspecific(UFConfManager::threadSpecificKey, confManager); + cerr << getpid() << ":::Adding thread specific UFConfManager key " << UFConfManager::threadSpecificKey << " " << confManager << " " << ret << ", tid : " << pthread_self() << endl; + + UFServer *_server = (UFServer *)_startingArgs; + _server->postThreadCreation(); + } + + UF* createUF() { return new PerThreadInitializer(); } + + PerThreadInitializer(bool registerMe = false) + { + if(registerMe) + _myLoc = UFFactory::getInstance()->registerFunc((UF*)this); + } + static PerThreadInitializer* _self; + static int _myLoc; +}; +int PerThreadInitializer::_myLoc = -1; +PerThreadInitializer* PerThreadInitializer::_self = new PerThreadInitializer(true); + void UFServer::startThreads() { preThreadCreation(); @@ -129,7 +150,14 @@ void UFServer::startThreads() //start the IO threads for(; i<MAX_THREADS_ALLOWED; i++) { - UFIO::ufCreateThreadWithIO(&(thread[i]), new list<UF*>()); + list<UF*>* ufsToAdd = new list<UF*>(); + + PerThreadInitializer *pti = new PerThreadInitializer; + pti->_startingArgs = this; + ufsToAdd->push_back(pti); + + UFIO::ufCreateThreadWithIO(&(thread[i]), ufsToAdd); + cerr<<getPrintableTime()<<" "<<getpid()<<": created thread (with I/O) - "<<thread[i]<<endl; usleep(5000); //TODO: avoid the need for threadChooser to have a mutex - change to cond. var later //add the io threads to the thread chooser @@ -152,10 +180,19 @@ void UFServer::startThreads() //start the accept thread for(; i<MAX_ACCEPT_THREADS_ALLOWED+MAX_THREADS_ALLOWED; i++) { - AcceptRunner* ar = new AcceptRunner(); - ar->_startingArgs = this; list<UF*>* ufsToAdd = new list<UF*>(); - ufsToAdd->push_back(ar); + for (ListenSocketList::iterator iter = _listenSockets.begin(); iter != _listenSockets.end(); ++iter) + { + AcceptRunner* ar = new AcceptRunner(); + ar->_startingArgs = this; + ar->socket = *iter; + ufsToAdd->push_back(ar); + } + + PerThreadInitializer *pti = new PerThreadInitializer(); + pti->_startingArgs = this; + ufsToAdd->push_back(pti); + UFIO::ufCreateThreadWithIO(&(thread[i]), ufsToAdd); usleep(5000); //TODO: let the thread finish initializing addThread("ACCEPT", 0, thread[i]); @@ -178,14 +215,17 @@ void UFServer::run() if(!_threadChooser) _threadChooser = new UFServerThreadChooser(); - //bind to the socket (before the fork - _listenFd = UFIO::setupConnectionToAccept(_addressToBindTo.c_str(), _port); //TODO:set the backlog - if(_listenFd < 0) + for (ListenSocketList::iterator iter = _listenSockets.begin(); iter != _listenSockets.end(); ++iter) { - cerr<<getPrintableTime()<<" "<<getpid()<<": couldnt setup listen socket "<<strerror(errno)<<endl; - exit(1); + //bind to the socket (before the fork) + iter->fd = UFIO::setupConnectionToAccept(_addressToBindTo.c_str(), iter->port); //TODO:set the backlog + if(iter->fd < 0) + { + cerr<<getPrintableTime()<<" "<<getpid()<<": couldnt setup listen socket "<<strerror(errno)<<endl; + exit(1); + } } - + if(!MAX_PROCESSES_ALLOWED) //an option to easily debug processes (or to only run in threaded mode) { preThreadRun(); @@ -214,6 +254,7 @@ void UFServer::run() postForkPreRun(); preThreadRun(); startThreads(); + postThreadRun(); exit(0); } cerr<<getPrintableTime()<<" "<<getpid()<<": (P): started child process: "<<pid<<endl;
Modified: trafficserver/traffic/branches/UserFiber/core/src/UFStatSystem.C URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFStatSystem.C?rev=985334&r1=985333&r2=985334&view=diff ============================================================================== --- trafficserver/traffic/branches/UserFiber/core/src/UFStatSystem.C (original) +++ trafficserver/traffic/branches/UserFiber/core/src/UFStatSystem.C Fri Aug 13 19:43:09 2010 @@ -1,15 +1,16 @@ #include <string.h> #include <stdio.h> -#include "UFStatSystem.H" -#include "UF.H" -#include "UFIO.H" -#include "UFServer.H" +#include <UFStatSystem.H> +#include <UF.H> +#include <UFIO.H> +#include <UFServer.H> #include <iostream> #include <errno.h> #include <sys/types.h> #include <sys/socket.h> #include <arpa/inet.h> +using namespace std; UFServer *UFStatSystem::server; std::map<std::string, uint32_t> UFStatSystem::stat_name_to_num; @@ -126,9 +127,8 @@ bool UFStatSystem::get_current(uint32_t bool UFStatSystem::get_current(const char *stat_name, long long *stat_val) { uint32_t stat_num; - if(!getStatNum(stat_name, stat_num)) { + if(!getStatNum(stat_name, stat_num)) return false; - } return get_current(stat_num, stat_val); } @@ -349,8 +349,9 @@ void StatCommandProcessing::run() char readbuf[1024]; std::string readData; - while(1) { - if((readbytes = ufio->read(readbuf, 1023, 0)) <= 0) + while(1) + { + if((readbytes = ufio->read(readbuf, 1023, 60000000)) <= 0) break; readData.append(readbuf, readbytes); @@ -358,25 +359,26 @@ void StatCommandProcessing::run() if(readData.find("\r\n") == string::npos) continue; - if(readData.find("stats_current") != string::npos) { + if(readData.find("stats_current") != string::npos) + { // Force a collect before printing out the stats UFStatSystem::collect(); std::stringstream printbuf; UFStatCollector::printStats(printbuf); - if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1) { + if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1) //failed write, break to close connection break; - } } - else if (readData.find("stats") != string::npos) { + else if (readData.find("stats") != string::npos) + { std::stringstream printbuf; UFStatCollector::printStats(printbuf); - if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1) { + if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1) //failed write, break to close connection break; - } } - else if (readData.find("stat ") != string::npos || readData.find("stat_current ") != string::npos) { + else if (readData.find("stat ") != string::npos || readData.find("stat_current ") != string::npos) + { std::vector<std::string> stats; char stat_name[MAX_STAT_NAME_LENGTH]; bzero(stat_name, MAX_STAT_NAME_LENGTH); @@ -387,49 +389,49 @@ void StatCommandProcessing::run() bool get_current = false; if(readData.find("stat ") != string::npos) start += strlen("stat "); - else { + else + { start += strlen("stat_current "); get_current = true; } - while(sscanf(start, "%s%n", stat_name, &next) == 1) { + while(sscanf(start, "%s%n", stat_name, &next) == 1) + { // Prefix support char *prefix_end = strchr(start, '*'); - if(prefix_end != NULL) { + if(prefix_end != NULL) + { std::string prefix; prefix.assign(start, prefix_end-start); // Get all stats with the prefix UFStatCollector::getStatsWithPrefix(prefix, stats); } - else { + else stats.push_back(stat_name); - } bzero(stat_name, MAX_STAT_NAME_LENGTH); start+=next; } std::stringstream printbuf; UFStatCollector::printStats(stats, printbuf, get_current); - if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1) { + if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1) //failed write, break to close connection break; - } } - else if (readData.find("help") != string::npos) { - if (ufio->write(cmdHelp, sizeof(cmdHelp)-1) == -1) { + else if (readData.find("help") != string::npos) + { + if (ufio->write(cmdHelp, sizeof(cmdHelp)-1) == -1) //failed write, break to close connection break; - } } - else if (readData.find("quit") != string::npos) { + else if (readData.find("quit") != string::npos) break; - } - else { + else + { if ((ufio->write(cmdUnrec, sizeof(cmdUnrec)-1) == -1) || - (ufio->write(cmdHelp, sizeof(cmdHelp)-1) == -1)) { + (ufio->write(cmdHelp, sizeof(cmdHelp)-1) == -1)) //failed write, break to close connection break; - } } readData.clear(); } // END while loop @@ -444,12 +446,24 @@ int StatCommandListenerRun::_myLoc = -1; StatCommandListenerRun* StatCommandListenerRun::_self = new StatCommandListenerRun(true); void StatCommandListenerRun::run() { - int fd = UFIO::setupConnectionToAccept(0, UFStatCollector::_statCommandPort, 16000); - if(fd < 0) + int fd; + unsigned int counter = 0; + do { - cerr<<"couldnt setup accept thread for stat port "<<strerror(errno)<<endl; - return; - } + UFStatCollector::_statCommandPort += counter; + fd = UFIO::setupConnectionToAccept(0, UFStatCollector::_statCommandPort, 16000); + if(fd < 0) + { + cerr<<"couldnt setup accept thread for stat port "<<strerror(errno)<<endl; + if(counter++ == 50) //try upto 50 times + return; + continue; + } + else + break; + } while(1); + cerr<<"setup stat command port at "<<UFStatCollector::_statCommandPort; + UFIO* ufio = new UFIO(UFScheduler::getUF()); if(!ufio) @@ -461,7 +475,7 @@ void StatCommandListenerRun::run() ufio->setFd(fd, false); StatThreadChooser ufiotChooser; - ufio->accept(&ufiotChooser, StatCommandProcessing::_myLoc, 0, 0, 0); + ufio->accept(&ufiotChooser, StatCommandProcessing::_myLoc, UFStatCollector::_statCommandPort, 0, 0); } void* UFStatCollector::scheduler(void *args) @@ -469,70 +483,66 @@ void* UFStatCollector::scheduler(void *a if(!args) return 0; + cerr<<getPrintableTime()<<" "<<getpid()<<": created stats thread (with I/O) - "<<pthread_self()<<endl; // add jobs to scheduler UFScheduler ufs; + //insertion has to be done in a LIFO (stack) manner // stat collector ufs.addFiberToScheduler(new CollectorRunner()); - - // set thread for stat command listener to run on - StatThreadChooser::_accept_thread = make_pair(&ufs, pthread_self()); - + // stat command listener port + ufs.addFiberToScheduler(new StatCommandListenerRun()); + ((UFServer*) args)->addThread("STAT_COLLECTOR", &ufs); // io scheduler ufs.addFiberToScheduler(new IORunner()); - // stat command listener - ufs.addFiberToScheduler(new StatCommandListenerRun()); - ((UFServer*) args)->addThread("STAT_COLLECTOR", &ufs); + // set thread for stat command listener to run on + StatThreadChooser::_accept_thread = make_pair(&ufs, pthread_self()); + ufs.runScheduler(); return 0; } //---------------------------------------------------------- -void UFStatCollector::printStats(std::stringstream &printbuf) { - printbuf << "Cache stats: \n" +void UFStatCollector::printStats(std::stringstream &printbuf) +{ + printbuf << "Cache stats: \n" "-----------------------------------------------------------------------------\n"; - - printbuf << "TIME " << _startTime <<"\n"; + printbuf << "TIME " << _startTime <<"\n"; - UFScheduler* running_thread_scheduler = UFScheduler::getUFScheduler(pthread_self()); - UF* running_user_fiber = running_thread_scheduler->getRunningFiberOnThisThread(); - statsMutex.lock(running_user_fiber); + UFScheduler* running_thread_scheduler = UFScheduler::getUFScheduler(pthread_self()); + UF* running_user_fiber = running_thread_scheduler->getRunningFiberOnThisThread(); + statsMutex.lock(running_user_fiber); - for(std::vector< Stat >::const_iterator it = UFStatSystem::global_stats.begin(); - it != UFStatSystem::global_stats.end(); it++) { - if(it->value != 0 ) { - printbuf << "STAT " << it->name << " " << it->value << "\n"; - } - } - statsMutex.unlock(running_user_fiber); + for(std::vector< Stat >::const_iterator it = UFStatSystem::global_stats.begin(); + it != UFStatSystem::global_stats.end(); it++) + printbuf << "STAT " << it->name << " " << it->value << "\n"; + statsMutex.unlock(running_user_fiber); - printbuf << "END\n"; + printbuf << "END\n"; } -void UFStatCollector::printStat(const char *stat_name, std::stringstream &printbuf, bool current) { - // Print only non zero stats +void UFStatCollector::printStat(const char *stat_name, std::stringstream &printbuf, bool current) +{ long long stat_val = 0; bool stat_get_status; - if(current) { + if(current) stat_get_status = UFStatSystem::get_current(stat_name, &stat_val); - } - else { + else stat_get_status = UFStatSystem::get(stat_name, &stat_val); - } - if(stat_get_status && stat_val != 0) { + //if(stat_get_status && stat_val != 0) { + if(stat_get_status) printbuf << "STAT " << stat_name << " " << stat_val << "\n"; - } } -void UFStatCollector::printStats(const std::vector<std::string> &stat_names, std::stringstream &printbuf, bool current) { - printbuf << "TIME " << _startTime <<"\n"; +void UFStatCollector::printStats(const std::vector<std::string> &stat_names, std::stringstream &printbuf, bool current) +{ + printbuf << "TIME " << _startTime <<"\n"; for(std::vector<std::string>::const_iterator it = stat_names.begin(); it != stat_names.end(); - it++) { + it++) printStat(it->c_str(), printbuf, current); - } printbuf << "END\n"; } @@ -544,11 +554,11 @@ UFStatCollector::getStatsWithPrefix(cons statsMutex.lock(running_user_fiber); // Get all stats which start with stat_prefix for(std::vector< Stat >::const_iterator it = UFStatSystem::global_stats.begin(); - it != UFStatSystem::global_stats.end(); it++) { + it != UFStatSystem::global_stats.end(); it++) + { size_t found = it->name.find(stat_prefix); - if(found == 0) { + if(!found) stat_names.push_back(it->name); - } } statsMutex.unlock(running_user_fiber); } Modified: trafficserver/traffic/branches/UserFiber/core/src/UFStats.C URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFStats.C?rev=985334&r1=985333&r2=985334&view=diff ============================================================================== --- trafficserver/traffic/branches/UserFiber/core/src/UFStats.C (original) +++ trafficserver/traffic/branches/UserFiber/core/src/UFStats.C Fri Aug 13 19:43:09 2010 @@ -1,6 +1,7 @@ -#include "UFStats.H" -#include "UFStatSystem.H" +#include <UFStats.H> +#include <UFStatSystem.H> +uint32_t UFStats::currentConnections; uint32_t UFStats::connectionsHandled; uint32_t UFStats::txnSuccess; uint32_t UFStats::txnFail; @@ -12,6 +13,7 @@ namespace UFStats { void registerStats(bool lock_needed) { + UFStatSystem::registerStat("connections.current", ¤tConnections, lock_needed); UFStatSystem::registerStat("connections.handled", &connectionsHandled, lock_needed); UFStatSystem::registerStat("txn.success", &txnSuccess, lock_needed); UFStatSystem::registerStat("txn.fail", &txnFail, lock_needed); Modified: trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C?rev=985334&r1=985333&r2=985334&view=diff ============================================================================== --- trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C (original) +++ trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C Fri Aug 13 19:43:09 2010 @@ -21,14 +21,15 @@ #include <string> #include <stdio.h> -#include "UF.H" -#include "UFIO.H" -#include "UFServer.H" -#include "UFConnectionPool.H" +#include <ufcore/UF.H> +#include <ufcore/UFIO.H> +#include <ufcore/UFServer.H> +#include <ufcore/UFConnectionPool.H> #include <vector> using namespace std; +unsigned int NUM_REQUESTS_TO_RUN = 0; unsigned short int NUM_THREADS = 1; unsigned int NUM_USER_FIBERS_ALLOWED_TO_RUN = 1; unsigned int NUM_CONNECTIONS_PER_FIBER = 1; @@ -79,10 +80,10 @@ ResponseInfoObject overallInfo; pthread_mutex_t overallInfoTrackMutex = PTHREAD_MUTEX_INITIALIZER; pthread_key_t threadUpdateOverallInfo; -int GET_RESPONSE_TIMEOUT = 1*1000*1000; +TIME_IN_US GET_RESPONSE_TIMEOUT = -1; string DOUBLE_NEWLINE = "\r\n\r\n"; unsigned int DOUBLE_NEWLINE_LENGTH = DOUBLE_NEWLINE.length(); -bool readData(UFIO* ufio, unsigned int timeout, bool& connClosed) +bool readData(UFIO* ufio, bool& connClosed) { string result; char buf[4096]; @@ -92,7 +93,7 @@ bool readData(UFIO* ufio, unsigned int t bool okToExitToEnd = false; while(1) { - int num_bytes_read = ufio->read(buf, 4095, 0); + int num_bytes_read = ufio->read(buf, 4095, GET_RESPONSE_TIMEOUT); if(num_bytes_read <= 0) { if(okToExitToEnd && (num_bytes_read == 0)) @@ -101,8 +102,13 @@ bool readData(UFIO* ufio, unsigned int t connClosed = true; return true; } + cerr<<"bytes read = "<<num_bytes_read<<" with errno = "<<strerror(ufio->getErrno())<<endl; return false; } + /* + else + cerr<<"read"<<string(buf, num_bytes_read)<<endl; + */ result.append(buf, num_bytes_read); @@ -147,6 +153,7 @@ bool readData(UFIO* ufio, unsigned int t (result.length() > (endOfHeaders + DOUBLE_NEWLINE_LENGTH + contentLength))) //dont support pipelining yet { cerr<<"read more than supposed to"<<endl; + cerr<<"read "<<result; return false; } else if(result.length() == endOfHeaders + DOUBLE_NEWLINE_LENGTH + contentLength) @@ -158,10 +165,12 @@ bool readData(UFIO* ufio, unsigned int t unsigned int SLEEP_BETWEEN_CONN_SETUP = 0; -int CONNECT_AND_REQUEST_TIMEOUT = 1*1000*1000; +TIME_IN_US CONNECT_AND_REQUEST_TIMEOUT = -1; bool writeData(UFIO* ufio, const string& data) { int amt_written = ufio->write(data.data(), data.length(), CONNECT_AND_REQUEST_TIMEOUT); + if(amt_written <= 0) + cerr<<"write failed "<<ufio->getErrno()<<endl; return ((amt_written == (int)data.length()) ? true : false); } @@ -175,7 +184,7 @@ UFIO* getConn(ResponseInfoObject* rIO) struct timeval start,finish; gettimeofday(&start, 0); - UFIO* ufio = cpool->getConnection(remote_addr); + UFIO* ufio = cpool->getConnection(remote_addr, true, CONNECT_AND_REQUEST_TIMEOUT); if(!ufio) { if(random()%100 < 10) @@ -221,7 +230,7 @@ void run_handler() return; //do the requests - unsigned short int num_requests_run = 0; + unsigned int num_requests_run = 0; while(num_requests_run++ < NUM_REQUESTS_PER_FIBER) { if(INTER_SEND_SLEEP) @@ -258,7 +267,7 @@ void run_handler() bool connClosed = false; - if(!readData(ufio, CONNECT_AND_REQUEST_TIMEOUT, connClosed)) + if(!readData(ufio, connClosed)) { if(random()%100 < 10) cerr<<"bailing since read data failed "<<strerror(errno)<<endl; @@ -300,7 +309,7 @@ void ClientUF::run() ResponseInfoObject* rIO = (ResponseInfoObject*)pthread_getspecific(threadUpdateOverallInfo); if(!rIO) return; - unsigned short int num_requests_run = 0; + unsigned int num_requests_run = 0; while(num_requests_run++ < NUM_CONNECTIONS_PER_FIBER) { //wait if told to do so @@ -405,16 +414,13 @@ void SetupClientUF::run() { if(!rIO.num_user_fibers_running) break; - else if ((THREAD_COMPLETION_PERCENT_TO_BAIL_ON < 100) && - (rIO.num_user_fibers_running*100/NUM_USER_FIBERS_ALLOWED_TO_RUN <= THREAD_COMPLETION_PERCENT_TO_BAIL_ON) - ) - { - cerr<<"bailing due to "<<"num_user_fibers_running = "<<rIO.num_user_fibers_running<<" and div = "<<(rIO.num_user_fibers_running*100/NUM_USER_FIBERS_ALLOWED_TO_RUN)<<" and amt to bail on = "<<THREAD_COMPLETION_PERCENT_TO_BAIL_ON<<endl; - break; - } UF::gusleep(5000000); - cerr <<pthread_self()<<": completed "<<rIO.num_attempt<<"/"<<(NUM_REQUESTS_PER_FIBER*NUM_CONNECTIONS_PER_FIBER*NUM_USER_FIBERS_ALLOWED_TO_RUN)<<" ("<<(rIO.num_attempt*100/(NUM_REQUESTS_PER_FIBER*NUM_CONNECTIONS_PER_FIBER*NUM_USER_FIBERS_ALLOWED_TO_RUN))<<"%)"<<endl; + unsigned short int threadCompletionPercent = (rIO.num_attempt*100)/NUM_REQUESTS_TO_RUN; + cerr <<pthread_self()<<": completed "<<rIO.num_attempt<<"/"<<NUM_REQUESTS_TO_RUN<<" ("<<threadCompletionPercent<<"%)"<<endl; + + if(threadCompletionPercent > THREAD_COMPLETION_PERCENT_TO_BAIL_ON) + break; } } @@ -551,6 +557,7 @@ void print_usage() int main(int argc, char** argv) { + unsigned long long int DELAY_BETWEEN_STARTING_THREADS_IN_US = 0; if(pthread_key_create(&threadUpdateOverallInfo, 0) != 0) { cerr<<"couldnt create key for threadUpdateOverallInfo "<<strerror(errno)<<endl; @@ -560,10 +567,13 @@ int main(int argc, char** argv) string rem_port = "80"; string rem_addr = "127.0.0.1"; char ch; - while ((ch = getopt(argc, argv, "M:Z:U:x:X:m:o:A:a:b:r:S:t:H:P:R:C:f:c:d:s:?h")) != -1) + while ((ch = getopt(argc, argv, "M:Z:U:x:m:o:A:a:b:r:S:t:H:P:R:C:f:c:d:s:?h")) != -1) { switch (ch) { + case'x': + DELAY_BETWEEN_STARTING_THREADS_IN_US = atoi(optarg); + break; case 'Z': sleepShouldBeRandom = atoi(optarg); break; @@ -609,14 +619,10 @@ int main(int argc, char** argv) HTTP_BASE_REQ_STRING = optarg; break; case 'c': - CONNECT_AND_REQUEST_TIMEOUT = atoi(optarg)*1000; - if(CONNECT_AND_REQUEST_TIMEOUT <= 0) - CONNECT_AND_REQUEST_TIMEOUT = -1; + CONNECT_AND_REQUEST_TIMEOUT = (atoi(optarg) > 0) ? atoi(optarg)*1000 : -1; break; case 'd': - GET_RESPONSE_TIMEOUT = atoi(optarg)*1000; - if(GET_RESPONSE_TIMEOUT <= 0) - GET_RESPONSE_TIMEOUT = -1; + GET_RESPONSE_TIMEOUT = (atoi(optarg) > 0) ? atoi(optarg)*1000 : -1; break; case 's': INTER_SEND_SLEEP = atoi(optarg)*1000; @@ -642,6 +648,7 @@ int main(int argc, char** argv) remote_addr = rem_addr + ":" + rem_port; print_info(); + NUM_REQUESTS_TO_RUN = NUM_REQUESTS_PER_FIBER*NUM_CONNECTIONS_PER_FIBER*NUM_USER_FIBERS_ALLOWED_TO_RUN; //create the threads pthread_t* thread = new pthread_t[NUM_THREADS]; @@ -651,6 +658,11 @@ int main(int argc, char** argv) list<UF*>* ufList = new list<UF*>(); ufList->push_back(new SetupClientUF()); UFIO::ufCreateThreadWithIO(&thread[i], ufList); + if(DELAY_BETWEEN_STARTING_THREADS_IN_US) + { + cerr<<"sleeping for "<<DELAY_BETWEEN_STARTING_THREADS_IN_US<<endl; + usleep(DELAY_BETWEEN_STARTING_THREADS_IN_US); + } }
