http://git-wip-us.apache.org/repos/asf/trafodion/blob/32fe8565/core/sqf/monitor/linux/process.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/process.cxx b/core/sqf/monitor/linux/process.cxx index 9b9ee00..a39a589 100644 --- a/core/sqf/monitor/linux/process.cxx +++ b/core/sqf/monitor/linux/process.cxx @@ -2589,27 +2589,46 @@ bool CProcess::Create (CProcess *parent, void* tag, int & result) // Take fork semaphore. We need to wait until parent indicates // it is ok to proceed. Pipes between parent and child need to // be set up before child can continue. + bool sem_log_error = false; int sem_rc; + int err = 0; struct timeval logTime; struct tm *ltime; + struct timespec ts; - gettimeofday(&logTime, NULL); - ltime = localtime(&logTime.tv_sec); + if (clock_gettime(CLOCK_REALTIME, &ts) == -1) + { + err = errno; + gettimeofday(&logTime, NULL); + ltime = localtime(&logTime.tv_sec); + snprintf(la_buf, sizeof(la_buf), + "%02d/%02d/%02d-%02d:%02d:%02d " + "[CProcess::Create], clock_gettime(CLOCK_REALTIME)," + " Child can't get time, %s (%d), program %s, (pid=%d).\n" + , ltime->tm_mon+1, ltime->tm_mday, ltime->tm_year-100, ltime->tm_hour, ltime->tm_min, ltime->tm_sec + , strerror(err), err + , filename, getpid()); + write (2, la_buf, strlen(la_buf)); + } + ts.tv_sec += 1; - struct timespec ts; - ts.tv_sec = 1; - ts.tv_nsec = 0; env = getenv( "MON_CREATE_SEM_DELAY" ); if (env && isdigit(*env)) { ts.tv_sec = atol(env); } - int err; + + env = getenv( "MON_CREATE_SEM_LOG_ERROR" ); + if (env && isdigit(*env)) + { + int val = atoi(env); + sem_log_error = (val != 0) ? true : false; + } do { sem_rc = sem_timedwait(MyNode->GetMutex(), &ts); err = errno; - if ( err == ETIMEDOUT ) + if ( sem_log_error && err == ETIMEDOUT ) { gettimeofday(&logTime, NULL); ltime = localtime(&logTime.tv_sec); @@ -2625,7 +2644,7 @@ bool CProcess::Create (CProcess *parent, void* tag, int & result) } while (sem_rc == -1 && (err == EINTR || err == ETIMEDOUT)); - if ( sem_rc == -1 && !(err == EINTR || err == ETIMEDOUT)) + if ( sem_log_error && sem_rc == -1 && !(err == EINTR || err == ETIMEDOUT)) { gettimeofday(&logTime, NULL); ltime = localtime(&logTime.tv_sec); @@ -3319,6 +3338,10 @@ void CProcess::Exit( CProcess *parent ) case ProcessType_NameServer: if ( IsAbended() ) { + if (!Clone) + { + NameServer->NameServerExited(); + } if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) trace_printf("%s@%d" " - NameServer abended" "\n", method_name, __LINE__); } @@ -4095,6 +4118,7 @@ void CProcess::Switch( CProcess *parent ) CProcessContainer::CProcessContainer (void) :numProcs_(0) ,nodeContainer_(false) + ,processNameFormatLong_(true) ,nameMap_(NULL) ,pidMap_(NULL) ,head_(NULL) @@ -4121,12 +4145,22 @@ CProcessContainer::CProcessContainer (void) abort(); } +#ifndef NAMESERVER_PROCESS + char *env = getenv("SQ_MON_PROCESS_NAME_FORMAT_LONG"); + if ( env && isdigit(*env) ) + { + int val = atoi(env); + processNameFormatLong_ = (val != 0) ? true : false; + } +#endif + TRACE_EXIT; } CProcessContainer::CProcessContainer( bool nodeContainer ) :numProcs_(0) ,nodeContainer_(nodeContainer) + ,processNameFormatLong_(true) ,nameMap_(NULL) ,pidMap_(NULL) ,head_(NULL) @@ -4161,6 +4195,15 @@ CProcessContainer::CProcessContainer( bool nodeContainer ) abort(); } +#ifndef NAMESERVER_PROCESS + char *env = getenv("SQ_MON_PROCESS_NAME_FORMAT_LONG"); + if ( env && isdigit(*env) ) + { + int val = atoi(env); + processNameFormatLong_ = (val != 0) ? true : false; + } +#endif + if ( nodeContainer_ ) { nameMap_ = new nameMap_t; @@ -4775,46 +4818,85 @@ void CProcessContainer::Bcast (struct message_def *msg) char *CProcessContainer::BuildOurName( int nid, int pid, char *name ) { - int i; - int rem; - int cnt[4]; - const char method_name[] = "CProcessContainer::BuildOurName"; TRACE_ENTRY; - // Convert Pid into base 35 acsii - cnt[0] = pid / 42875; - rem = pid - ( cnt[0] * 42875 ); - cnt[1] = rem / 1225; - rem -= ( cnt[1] * 1225 ); - cnt[2] = rem / 35; - rem -= ( cnt[2] * 35 ); - cnt[3] = rem; + int i; + int rem; + int cnt[6]; + + if (!processNameFormatLong_) + { + // Convert Pid into base 35 acsii + cnt[0] = pid / 42875; // (35 * 35 * 35) + rem = pid - ( cnt[0] * 42875 ); + cnt[1] = rem / 1225; // (35 * 35) + rem -= ( cnt[1] * 1225 ); + cnt[2] = rem / 35; + rem -= ( cnt[2] * 35 ); + cnt[3] = rem; + + // Process name format long: '$Zxxpppp' xx = nid, pppp = pid - // Convert Nid into base 16 acsii - sprintf(name,"$Z%2.2X",nid); - for(i=3; i>=0; i--) - { - if( cnt[i] < 10 ) - { - name[i+4] = '0'+cnt[i]; - } - else + // Convert Nid into base 16 acsii + sprintf(name,"$Z%2.2X",nid); + + // Convert Pid into base 36 ascii + for(i=3; i>=0; i--) { - cnt[i] -= 10; - // we are skipping cap 'o' because it looks like zero. - if( cnt[i] >= 14 ) + if( cnt[i] < 10 ) { - name[i+4] = 'P'+(cnt[i]-14); + name[i+4] = '0'+cnt[i]; } else { - name[i+4] = 'A'+cnt[i]; + cnt[i] -= 10; + // we are skipping cap 'o' because it looks like zero. + if( cnt[i] >= 14 ) + { + name[i+4] = 'P'+(cnt[i]-14); + } + else + { + name[i+4] = 'A'+cnt[i]; + } } } + name[8] = '\0'; + } + else + { + // We are skipping 'A', 'I', 'O', and 'U' to distinguish between zero + // and one digits, and for political correctness in generated names + char b32table[32] = {'0','1','2','3','4','5','6','7','8','9' + ,'B','C','D','E','F','G','H','J','K','L','M' + ,'N','P','Q','R','S','T','V','W','X','Y','Z' }; + + // Convert Pid into base 32 ascii + cnt[0] = pid / 33554432; // (32 * 32 * 32 * 32 * 32) + rem = pid - ( cnt[0] * 33554432 ); + cnt[1] = rem / 1048576; // (32 * 32 * 32 * 32) + rem -= ( cnt[1] * 1048576 ); + cnt[2] = rem / 32768; // (32 * 32 * 32) + rem -= ( cnt[2] * 32768 ); + cnt[3] = rem / 1024; // (32 * 32) + rem -= ( cnt[3] * 1024 ); + cnt[4] = rem / 32; + rem -= ( cnt[4] * 32 ); + cnt[5] = rem; + + // Process name format long: '$Zxxxxpppppp' xxxx = nid, pppppp = pid + + // Convert Nid into base 16 ascii + sprintf(name,"$Z%4.4X",nid); + + // Convert Pid into base 32 ascii + for(i=5; i>=0; i--) + { + name[i+6] = static_cast<char>(b32table[cnt[i]]); + } + name[12] = '\0'; } - name[8] = '\0'; - TRACE_EXIT; return name; @@ -5398,6 +5480,65 @@ CProcess *CProcessContainer::CreateProcess (CProcess * parent, } #endif +#ifdef NAMESERVER_PROCESS +void CProcessContainer::DeleteAllDown() +{ + CProcess *process = NULL; + int nid = -1; + int pid = -1; + + const char method_name[] = "CProcessContainer::DeleteAllDown"; + TRACE_ENTRY; + + nameMap_t::iterator nameMapIt; + + while ( true ) + { + nameMapLock_.lock(); + nameMapIt = nameMap_->begin(); + + if (nameMap_->size() == 0) + { + nameMapLock_.unlock(); + break; // all done + } + + process = nameMapIt->second; + + // Delete name map entry + nameMap_->erase (nameMapIt); + + nameMapLock_.unlock(); + + nid = process->GetNid(); + pid = process->GetPid(); + + if (trace_settings & (TRACE_PROCESS | TRACE_PROCESS_DETAIL)) + { + trace_printf("%s@%d removed from nameMap %p: %s (%d, %d)\n", + method_name, __LINE__, nameMap_, + process->GetName(), nid, pid); + } + + // Delete pid map entry + DelFromPidMap ( process ); + + if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d - Completed delete for %s (%d, %d)\n" + , method_name, __LINE__ + , process->GetName(), nid, pid); + } + + // Remove all processes + // PSD will re-create persistent processes on spare node activation + Exit_Process( process, true, nid ); + } + + TRACE_EXIT; +} +#endif + void CProcessContainer::DeleteFromList( CProcess *process ) { const char method_name[] = "CProcessContainer::DeleteFromList"; @@ -7086,7 +7227,9 @@ void CProcessContainer::SetProcessState( CProcess *process, STATE state, bool ab // Note: Exit_Process() will delete the process object, so // save the process information needed before the call +#ifndef NAMESERVER_PROCESS PROCESSTYPE processType = process->GetType(); +#endif string processName = process->GetName(); int processNid = process->GetNid(); int processPid = process->GetPid(); @@ -7101,6 +7244,7 @@ void CProcessContainer::SetProcessState( CProcess *process, STATE state, bool ab , processName.c_str(), processNid, processPid, processVerifier , abend, downNode , MyNode->IsKillingNode(), MyNode->IsDTMAborted(), MyNode->IsSMSAborted()); +#ifndef NAMESERVER_PROCESS if ( !MyNode->IsKillingNode() ) { switch ( processType ) @@ -7147,6 +7291,7 @@ void CProcessContainer::SetProcessState( CProcess *process, STATE state, bool ab break; } } +#endif } break; default:
http://git-wip-us.apache.org/repos/asf/trafodion/blob/32fe8565/core/sqf/monitor/linux/process.h ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/process.h b/core/sqf/monitor/linux/process.h index 3cde3e5..736ddcc 100644 --- a/core/sqf/monitor/linux/process.h +++ b/core/sqf/monitor/linux/process.h @@ -139,6 +139,9 @@ class CProcessContainer , void *tag , int & result ); +#ifdef NAMESERVER_PROCESS + void DeleteAllDown(); +#endif bool Dump_Process( CProcess *dumper, CProcess *process, char *core_path ); void DumpCallback( int nid, pid_t pid, int status ); void Exit_Process( CProcess *process, bool abend, int downNode ); @@ -185,10 +188,12 @@ protected: inline void SetNumProcs( int numProcs ) { numProcs_ = numProcs; }; private: - int numProcs_; // Number of processes in container + int numProcs_; // Number of processes in container sem_t *Mutex; - bool nodeContainer_; // true when physical node process container + bool nodeContainer_; // true when physical node process container + bool processNameFormatLong_; // when true process name format is: + // '$Zxxxxpppppp' xxxx = nid, pppppp = pid nameMap_t *nameMap_; pidMap_t *pidMap_; CLock pidMapLock_; http://git-wip-us.apache.org/repos/asf/trafodion/blob/32fe8565/core/sqf/monitor/linux/ptpclient.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/ptpclient.cxx b/core/sqf/monitor/linux/ptpclient.cxx index a88e2d2..39e4443 100644 --- a/core/sqf/monitor/linux/ptpclient.cxx +++ b/core/sqf/monitor/linux/ptpclient.cxx @@ -57,9 +57,13 @@ extern CNode *MyNode; extern CNodeContainer *Nodes; extern bool IsRealCluster; extern CMeas Meas; +extern int MyPNID; + +#define MON2MON_IO_RETRIES 3 CPtpClient::CPtpClient (void) - : ptpSock_(0) + : ptpCommPort_(0) + , ptpClusterSocks_(NULL) , seqNum_(0) { const char method_name[] = "CPtpClient::CPtpClient"; @@ -72,11 +76,10 @@ CPtpClient::CPtpClient (void) SetLocalHost(); } - - char * p = getenv( "MON2MON_COMM_PORT" ); - if ( p ) + char * env = getenv( "MON2MON_COMM_PORT" ); + if ( env ) { - basePort_ = atoi( p ); + ptpCommPort_ = atoi( env ); } else { @@ -88,6 +91,12 @@ CPtpClient::CPtpClient (void) abort(); } + ptpClusterSocks_ = new int[MAX_NODES]; + for (int i=0; i < MAX_NODES; ++i) + { + ptpClusterSocks_[i] = -1; + } + TRACE_EXIT; } @@ -96,18 +105,84 @@ CPtpClient::~CPtpClient (void) const char method_name[] = "CPtpClient::~CPtpClient"; TRACE_ENTRY; + delete [] ptpClusterSocks_; + TRACE_EXIT; } -int CPtpClient::AddUniqStr( int nid - , int id - , const char *stringValue - , int targetNid - , const char *targetNodeName ) +int CPtpClient::InitializePtpClient( int pnid, char * ptpPort ) +{ + const char method_name[] = "CPtpClient::InitializePtpClient"; + TRACE_ENTRY; + int err = 0; + + if (ptpClusterSocks_[pnid] == -1) + { + int sock = Monitor->MkCltSock( ptpPort ); + if (sock < 0) + { + err = sock; + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d - MkCltSock failed with error %d\n" + , method_name, __LINE__, err ); + } + } + else + { + ptpClusterSocks_[pnid] = sock; + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d - connected to monitor node=%d(%s), sock=%d, " + "ptpClusterSocks_[%d]=%d\n" + , method_name, __LINE__ + , pnid + , ptpPort + , sock + , pnid + , ptpClusterSocks_[pnid] ); + } + } + } + + TRACE_EXIT; + return err; +} + +bool CPtpClient::IsTargetRemote( int targetNid ) +{ + const char method_name[] = "CPtpClient::IsTargetRemote"; + TRACE_ENTRY; + + CLNode *targetLNode = Nodes->GetLNode( targetNid ); + CNode *targetNode = targetLNode->GetNode(); + bool rs = (targetNode && targetNode->GetPNid() == MyPNID) ? false : true ; + + TRACE_EXIT; + return(rs); +} + +int CPtpClient::ProcessAddUniqStr( int nid + , int id + , const char *stringValue + , int targetNid + , const char *targetNodeName ) { - const char method_name[] = "CPtpClient::AddUniqStr"; + const char method_name[] = "CPtpClient::ProcessAddUniqStr"; TRACE_ENTRY; + if (!IsTargetRemote( targetNid )) + { + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d - Not Sending InternalType_UniqStr request to " + "local nid=%d\n" + , method_name, __LINE__ + , targetNid ); + } + return(0); + } + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) { trace_printf( "%s@%d - Sending InternalType_UniqStr request to %s, " @@ -129,59 +204,32 @@ int CPtpClient::AddUniqStr( int nid // Copy the string memcpy( stringData, stringValue, stringDataLen ); - int size = offsetof(struct internal_msg_def, u); - size += sizeof(msg.u.uniqstr); - size += stringDataLen; - + ptpMsgInfo_t myInfo; + myInfo.pnid = MyPNID; + myInfo.size = offsetof(struct internal_msg_def, u); + myInfo.size += sizeof(msg.u.uniqstr); + myInfo.size += stringDataLen; + if (trace_settings & TRACE_PROCESS_DETAIL) { trace_printf( "%s@%d - size_=%d, forwarding unique string [%d, %d] (%s)\n" , method_name, __LINE__ - , size + , myInfo.size , msg.u.uniqstr.nid , msg.u.uniqstr.id , &msg.u.uniqstr.valueData ); } - int error = SendToMon("add-unique-string", &msg, size, targetNid, targetNodeName); + int error = SendToMon( "process-add-unique-string" + , &msg + , myInfo + , targetNid + , targetNodeName); TRACE_EXIT; return error; } -int CPtpClient::InitializePtpClient( char * ptpPort ) -{ - const char method_name[] = "CPtpClient::InitializePtpClient"; - TRACE_ENTRY; - int err = 0; - - int sock = Monitor->MkCltSock( ptpPort ); - if (sock < 0) - { - err = sock; - - if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) - { - trace_printf( "%s@%d - MkCltSock failed with error %d\n" - , method_name, __LINE__, err ); - } - } - else - { - ptpSock_ = sock; - if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) - { - trace_printf( "%s@%d - connected to monitor node=%s, sock=%d\n" - , method_name, __LINE__ - , ptpPort - , ptpSock_ ); - } - } - - TRACE_EXIT; - return err; -} - int CPtpClient::ProcessClone( CProcess *process ) { const char method_name[] = "CPtpClient::ProcessClone"; @@ -209,6 +257,18 @@ int CPtpClient::ProcessClone( CProcess *process ) return(0); } + if (!IsTargetRemote( process->GetParentNid() )) + { + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d - Not Sending InternalType_Clone request to " + "local nid=%d\n" + , method_name, __LINE__ + , process->GetParentNid() ); + } + return(0); + } + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) { trace_printf( "%s@%d - Sending InternalType_Clone request to %s, parentNid=%d" @@ -281,13 +341,15 @@ int CPtpClient::ProcessClone( CProcess *process ) msg.u.clone.argvLen = argvLen; memcpy( stringData, process->userArgv(), argvLen ); - int size = offsetof(struct internal_msg_def, u); - size += sizeof(msg.u.clone); - size += nameLen ; - size += portLen ; - size += infileLen ; - size += outfileLen ; - size += argvLen ; + ptpMsgInfo_t myInfo; + myInfo.pnid = MyPNID; + myInfo.size = offsetof(struct internal_msg_def, u); + myInfo.size += sizeof(msg.u.clone); + myInfo.size += nameLen ; + myInfo.size += portLen ; + myInfo.size += infileLen ; + myInfo.size += outfileLen ; + myInfo.size += argvLen ; if (trace_settings & TRACE_PROCESS_DETAIL) { @@ -299,7 +361,7 @@ int CPtpClient::ProcessClone( CProcess *process ) "outfile=%s, strlen(outfile)=%d, " "argc=%d, strlen(total argv)=%d, args=[%.*s]\n" , method_name, __LINE__ - , size + , myInfo.size , msg.u.clone.programStrId.nid , msg.u.clone.programStrId.id , msg.u.clone.pathStrId.nid @@ -322,7 +384,7 @@ int CPtpClient::ProcessClone( CProcess *process ) int error = SendToMon( "process-clone" , &msg - , size + , myInfo , process->GetParentNid() , parentLNode->GetNode()->GetName()); @@ -337,6 +399,18 @@ int CPtpClient::ProcessExit( CProcess *process const char method_name[] = "CPtpClient::ProcessExit"; TRACE_ENTRY; + if (!IsTargetRemote( targetNid )) + { + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d - Not Sending InternalType_Exit request to " + "local nid=%d\n" + , method_name, __LINE__ + , targetNid ); + } + return(0); + } + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) { trace_printf( "%s@%d - Sending InternalType_Exit request to %s, targetNid=%d" @@ -359,15 +433,17 @@ int CPtpClient::ProcessExit( CProcess *process strcpy(msg.u.exit.name, process->GetName()); msg.u.exit.abended = process->IsAbended(); - int size = offsetof(struct internal_msg_def, u); - size += sizeof(msg.u.exit); + ptpMsgInfo_t myInfo; + myInfo.pnid = MyPNID; + myInfo.size = offsetof(struct internal_msg_def, u); + myInfo.size += sizeof(msg.u.exit); if (trace_settings & TRACE_PROCESS_DETAIL) { trace_printf( "%s@%d - size_=%d, process %s (%d,%d:%d) " "abended=%d\n" , method_name, __LINE__ - , size + , myInfo.size , msg.u.exit.name , msg.u.exit.nid , msg.u.exit.pid @@ -375,7 +451,11 @@ int CPtpClient::ProcessExit( CProcess *process , msg.u.exit.abended ); } - int error = SendToMon("process-exit", &msg, size, targetNid, targetNodeName); + int error = SendToMon( "process-exit" + , &msg + , myInfo + , targetNid + , targetNodeName); TRACE_EXIT; return error; @@ -411,6 +491,18 @@ int CPtpClient::ProcessInit( CProcess *process return(0); } + if (!IsTargetRemote( process->GetParentNid() )) + { + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d - Not Sending InternalType_ProcessInit request to " + "local nid=%d\n" + , method_name, __LINE__ + , process->GetParentNid() ); + } + return(0); + } + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) { trace_printf( "%s@%d" " - Sending InternalType_ProcessInit to parent node %s, parentNid=%d" @@ -438,12 +530,14 @@ int CPtpClient::ProcessInit( CProcess *process msg.u.processInit.tag = tag; msg.u.processInit.origNid = process->GetParentNid(); - int size = offsetof(struct internal_msg_def, u); - size += sizeof(msg.u.processInit); + ptpMsgInfo_t myInfo; + myInfo.pnid = MyPNID; + myInfo.size = offsetof(struct internal_msg_def, u); + myInfo.size += sizeof(msg.u.processInit); int error = SendToMon( "process-init" , &msg - , size + , myInfo , parentNid , parentLNode->GetNode()->GetName() ); @@ -460,6 +554,18 @@ int CPtpClient::ProcessKill( CProcess *process const char method_name[] = "CPtpClient::ProcessKill"; TRACE_ENTRY; + if (!IsTargetRemote( targetNid )) + { + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d - Not Sending InternalType_Kill request to " + "local nid=%d\n" + , method_name, __LINE__ + , targetNid ); + } + return(0); + } + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) { trace_printf( "%s@%d - Sending InternalType_Kill request to %s, targetNid=%d" @@ -480,22 +586,28 @@ int CPtpClient::ProcessKill( CProcess *process msg.u.kill.verifier = process->GetVerifier(); msg.u.kill.persistent_abort = abort; - int size = offsetof(struct internal_msg_def, u); - size += sizeof(msg.u.exit); + ptpMsgInfo_t myInfo; + myInfo.pnid = MyPNID; + myInfo.size = offsetof(struct internal_msg_def, u); + myInfo.size += sizeof(msg.u.exit); if (trace_settings & TRACE_PROCESS_DETAIL) { trace_printf( "%s@%d - size_=%d, process (%d,%d:%d) " "persistent_abort=%d\n" , method_name, __LINE__ - , size + , myInfo.size , msg.u.kill.nid , msg.u.kill.pid , msg.u.kill.verifier , msg.u.kill.persistent_abort ); } - int error = SendToMon("process-kill", &msg, size, targetNid, targetNodeName); + int error = SendToMon( "process-kill" + , &msg + , myInfo + , targetNid + , targetNodeName); TRACE_EXIT; return error; @@ -508,6 +620,18 @@ int CPtpClient::ProcessNew( CProcess *process const char method_name[] = "CPtpClient::ProcessNew"; TRACE_ENTRY; + if (!IsTargetRemote( targetNid )) + { + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d - Not Sending InternalType_Process request to " + "local nid=%d\n" + , method_name, __LINE__ + , targetNid ); + } + return(0); + } + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) { trace_printf( "%s@%d - Sending InternalType_Process request to %s, targetNid=%d" @@ -567,12 +691,14 @@ int CPtpClient::ProcessNew( CProcess *process msg.u.process.argvLen = argvLen; memcpy( stringData, process->userArgv(), argvLen ); - int size = offsetof(struct internal_msg_def, u); - size += sizeof(msg.u.process); - size += nameLen ; - size += infileLen ; - size += outfileLen ; - size += argvLen ; + ptpMsgInfo_t myInfo; + myInfo.pnid = MyPNID; + myInfo.size = offsetof(struct internal_msg_def, u); + myInfo.size += sizeof(msg.u.process); + myInfo.size += nameLen ; + myInfo.size += infileLen ; + myInfo.size += outfileLen ; + myInfo.size += argvLen ; if (trace_settings & TRACE_PROCESS_DETAIL) { @@ -583,7 +709,7 @@ int CPtpClient::ProcessNew( CProcess *process "outfile=%s, strlen(outfile)=%d, " "argc=%d, strlen(total argv)=%d, args=[%.*s]\n" , method_name, __LINE__ - , size + , myInfo.size , msg.u.process.programStrId.nid , msg.u.process.programStrId.id , msg.u.process.pathStrId.nid @@ -602,7 +728,11 @@ int CPtpClient::ProcessNew( CProcess *process , &msg.u.process.stringData+nameLen+infileLen+outfileLen); } - int error = SendToMon("process-new", &msg, size, targetNid, targetNodeName); + int error = SendToMon( "process-new" + , &msg + , myInfo + , targetNid + , targetNodeName); TRACE_EXIT; return error; @@ -620,6 +750,18 @@ int CPtpClient::ProcessNotify( int nid const char method_name[] = "CPtpClient::ProcessNotify"; TRACE_ENTRY; + if (!IsTargetRemote( targetNid )) + { + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d - Not Sending InternalType_Notify request to " + "local nid=%d\n" + , method_name, __LINE__ + , targetNid ); + } + return(0); + } + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) { trace_printf( "%s@%d - Sending InternalType_Notify request to %s" @@ -682,174 +824,224 @@ int CPtpClient::ProcessNotify( int nid } } - int size = offsetof(struct internal_msg_def, u); - size += sizeof(msg.u.notify); + ptpMsgInfo_t myInfo; + myInfo.pnid = MyPNID; + myInfo.size = offsetof(struct internal_msg_def, u); + myInfo.size += sizeof(msg.u.notify); - int error = SendToMon("process-notify", &msg, size, targetNid, targetNodeName); + int error = SendToMon( "process-notify" + , &msg + , myInfo + , targetNid + , targetNodeName); TRACE_EXIT; return error; } -int CPtpClient::ReceiveSock(char *buf, int size, int sockFd) +int CPtpClient::ProcessStdInReq( int nid + , int pid + , StdinReqType type + , int supplierNid + , int supplierPid ) { - const char method_name[] = "CPtpClient::ReceiveSock"; + const char method_name[] = "CPtpClient::ProcessStdInReq"; TRACE_ENTRY; - bool readAgain = false; - int error = 0; - int readCount = 0; - int received = 0; - int sizeCount = size; - - do + if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS)) { - readCount = (int) recv( sockFd - , buf - , sizeCount - , 0 ); - if ( readCount > 0 ) Meas.addSockPtpRcvdBytes( readCount ); + trace_printf( "%s@%d - Sending InternalType_StdinReq request type =%d " + "from (%d,%d), for supplier (%d,%d)\n" + , method_name, __LINE__ + , type + , nid + , pid + , supplierNid + , supplierPid ); + } - if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) - { - trace_printf( "%s@%d - Count read %d = recv(%d)\n" - , method_name, __LINE__ - , readCount - , sizeCount ); - } - - if ( readCount > 0 ) - { // Got data - received += readCount; - buf += readCount; - if ( received == size ) - { - readAgain = false; - } - else - { - sizeCount -= readCount; - readAgain = true; - } - } - else if ( readCount == 0 ) - { // EOF - error = ENODATA; - readAgain = false; - } - else - { // Got an error - if ( errno != EINTR) - { - error = errno; - readAgain = false; - } - else - { - readAgain = true; - } - } + CLNode *lnode = Nodes->GetLNode( supplierNid ); + if (lnode == NULL) + { + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf) + , "[%s], Can't find supplier node nid=%d " + "for stdin data request.\n" + , method_name + , supplierNid ); + mon_log_write(PTPCLIENT_STDINREQ_1, SQ_LOG_ERR, buf); + + TRACE_EXIT; + return -1; } - while( readAgain ); - if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) + CProcess *process = lnode->GetProcessL( supplierPid ); + if (process == NULL) { - trace_printf( "%s@%d - recv(), received=%d, error=%d(%s)\n" + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf) + , "[%s], Can't find process nid=%d, " + "pid=%d for stdin data request.\n" + , method_name + , supplierNid + , supplierPid ); + mon_log_write(PTPCLIENT_STDINREQ_2, SQ_LOG_ERR, buf); + + TRACE_EXIT; + return -1; + } + + struct internal_msg_def msg; + memset(&msg, 0, sizeof(msg)); + msg.type = InternalType_StdinReq; + msg.u.stdin_req.nid = nid; + msg.u.stdin_req.pid = pid; + msg.u.stdin_req.reqType = type; + msg.u.stdin_req.supplier_nid = supplierNid; + msg.u.stdin_req.supplier_pid = supplierPid; + + ptpMsgInfo_t myInfo; + myInfo.pnid = MyPNID; + myInfo.size = offsetof(struct internal_msg_def, u); + myInfo.size += sizeof(msg.u.stdin_req); + + if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS_DETAIL)) + { + trace_printf( "%s@%d - size_=%d, type =%d " + "from (%d,%d), for supplier (%d,%d)\n" , method_name, __LINE__ - , received - , error, strerror(error) ); + , myInfo.size + , msg.u.stdin_req.reqType + , msg.u.stdin_req.nid + , msg.u.stdin_req.pid + , msg.u.stdin_req.supplier_nid + , msg.u.stdin_req.supplier_pid ); } + int error = SendToMon( "process-stdin" + , &msg + , myInfo + , process->GetNid() + , lnode->GetNode()->GetName()); + TRACE_EXIT; return error; } -void CPtpClient::SetLocalHost( void ) -{ - gethostname( ptpHost_, MAX_PROCESSOR_NAME ); -} - -int CPtpClient::SendSock(char *buf, int size, int sockFd) +int CPtpClient::ProcessStdIoData( int nid + , int pid + , StdIoType type + , ssize_t count + , char *data ) { - const char method_name[] = "CPtpClient::SendSock"; + const char method_name[] = "CPtpClient::ProcessStdIoData"; TRACE_ENTRY; - - bool sendAgain = false; - int error = 0; - int sendCount = 0; - int sent = 0; - - do - { - sendCount = (int) send( sockFd - , buf - , size - , 0 ); - if ( sendCount > 0 ) Meas.addSockPtpSentBytes( sendCount ); - if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) - { - trace_printf( "%s@%d - send(), sendCount=%d\n" - , method_name, __LINE__ - , sendCount ); - } - - if ( sendCount > 0 ) - { // Sent data - sent += sendCount; - if ( sendCount == size ) - { - sendAgain = false; - } - else - { - sendAgain = true; - } - } - else - { // Got an error - if ( errno != EINTR) - { - error = errno; - sendAgain = false; - } - else - { - sendAgain = true; - } - } + if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS)) + { + trace_printf( "%s@%d - Sending InternalType_IoData request type =%d " + "to (%d,%d), count=%ld\n" + , method_name, __LINE__ + , type + , nid + , pid + , count ); } - while( sendAgain ); - if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) + CLNode *lnode = Nodes->GetLNode( nid ); + if (lnode == NULL) { - trace_printf( "%s@%d - send(), sent=%d, error=%d(%s)\n" + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf) + , "[%s], Can't find supplier node nid=%d " + "for stdin data request.\n" + , method_name + , nid ); + mon_log_write(PTPCLIENT_STDIODATA_1, SQ_LOG_ERR, buf); + + TRACE_EXIT; + return -1; + } + + CProcess *process = lnode->GetProcessL( pid ); + if (process == NULL) + { + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf) + , "[%s], Can't find process nid=%d, " + "pid=%d for stdin data request.\n" + , method_name + , nid + , pid ); + mon_log_write(PTPCLIENT_STDIODATA_2, SQ_LOG_ERR, buf); + + TRACE_EXIT; + return -1; + } + + struct internal_msg_def msg; + memset(&msg, 0, sizeof(msg)); + msg.type = InternalType_IoData; + msg.u.iodata.nid = nid ; + msg.u.iodata.pid = pid ; + msg.u.iodata.ioType = type ; + msg.u.iodata.length = count; + memcpy(&msg.u.iodata.data, data, count); + + ptpMsgInfo_t myInfo; + myInfo.pnid = MyPNID; + myInfo.size = offsetof(struct internal_msg_def, u); + myInfo.size += sizeof(msg.u.iodata); + + if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS_DETAIL)) + { + trace_printf( "%s@%d - size_=%d, type =%d " + "to (%d,%d), count=%d\n(%s)" , method_name, __LINE__ - , sent - , error, strerror(error) ); + , myInfo.size + , msg.u.iodata.ioType + , msg.u.iodata.nid + , msg.u.iodata.pid + , msg.u.iodata.length + , msg.u.iodata.length?msg.u.iodata.data:"\n" ); } + int error = SendToMon( "process-stdio-data" + , &msg + , myInfo + , process->GetNid() + , lnode->GetNode()->GetName()); + TRACE_EXIT; return error; } -int CPtpClient::SendToMon(const char *reqType, internal_msg_def *msg, int size, - int receiveNode, const char *hostName) +int CPtpClient::SendToMon(const char *reqType, internal_msg_def *msg + , ptpMsgInfo_t &myInfo + , int targetNid, const char *hostName) { const char method_name[] = "CPtpClient::SendToMon"; TRACE_ENTRY; - char monPortString[MAX_PROCESSOR_NAME]; char ptpHost[MAX_PROCESSOR_NAME]; char ptpPort[MAX_PROCESSOR_NAME]; - int tempPort = basePort_; - + int error = 0; + int tempPort = ptpCommPort_; + int pnid = 0; + int sendSock = -1; + int retryCount = 0; + CNode *node = NULL; + CLNode *lnode = NULL; + ptpHost[0] = '\0'; + lnode = Nodes->GetLNode( targetNid ); + node = lnode->GetNode(); + pnid = node->GetPNid(); // For virtual env if (!IsRealCluster) { - tempPort += receiveNode; + tempPort += targetNid; strcat( ptpHost, ptpHost_ ); } else @@ -859,243 +1051,245 @@ int CPtpClient::SendToMon(const char *reqType, internal_msg_def *msg, int size, if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) { - trace_printf( "%s@%d - reqType=%s, hostName=%s, receiveNode=%d, " - "ptpHost=%s, tempPort=%d, basePort_=%d\n" + trace_printf( "%s@%d - reqType=%s, hostName=%s, targetNid=%d, " + "ptpHost=%s, tempPort=%d, ptpCommPort_=%d\n" , method_name, __LINE__ , reqType , hostName - , receiveNode + , targetNid , ptpHost , tempPort - , basePort_ ); + , ptpCommPort_ ); } memset( &ptpPort, 0, MAX_PROCESSOR_NAME ); memset( &ptpPortBase_, 0, MAX_PROCESSOR_NAME+100 ); + sprintf( ptpPortBase_,"%s:", ptpHost ); + sprintf( ptpPort,"%s%d", ptpPortBase_, tempPort ); - strcat( ptpPortBase_, ptpHost ); - strcat( ptpPortBase_, ":" ); - sprintf( monPortString,"%d", tempPort ); - strcat( ptpPort, ptpPortBase_ ); - strcat( ptpPort, monPortString ); +retryIO: - int error = InitializePtpClient( ptpPort ); - if (error < 0) + if (ptpClusterSocks_[pnid] == -1) { - TRACE_EXIT; - return error; + error = InitializePtpClient( pnid, ptpPort ); + if (error < 0) + { + ptpClusterSocks_[pnid] = -1; + TRACE_EXIT; + return error; + } } + sendSock = ptpClusterSocks_[pnid]; + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) { trace_printf( "%s@%d - sending %s REQ to Monitor=%s, sock=%d\n" , method_name, __LINE__ , reqType , ptpPort - , ptpSock_); + , sendSock ); } - error = SendSock((char *) &size, sizeof(size), ptpSock_); + error = SockSend((char *) &myInfo, sizeof(ptpMsgInfo_t), sendSock); if (error) { - if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + int err = error; + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf) + , "[%s], unable to send %s request size %ld to " + "node %s, error: %d(%s)\n" + , method_name, reqType, sizeof(ptpMsgInfo_t), ptpHost, err, strerror(err) ); + mon_log_write(PTPCLIENT_SENDTOMON_1, SQ_LOG_ERR, buf); + } + else + { + error = SockSend((char *) msg, myInfo.size, sendSock); + if (error) { - trace_printf( "%s@%d - error sending to Monitor=%s, sock=%d, error=%d\n" - , method_name, __LINE__ - , ptpPort - , ptpSock_ - , error ); + int err = error; + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf) + , "[%s], unable to send %s request to " + "node %s, error: %d(%s)\n" + , method_name, reqType, ptpHost, err, strerror(err) ); + mon_log_write(PTPCLIENT_SENDTOMON_2, SQ_LOG_ERR, buf); } } - error = SendSock((char *) msg, size, ptpSock_); if (error) { - if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + SockClose( pnid ); + if ( retryCount < MON2MON_IO_RETRIES ) { - trace_printf( "%s@%d - error sending to nameserver=%s, sock=%d, error=%d\n" - , method_name, __LINE__ - , ptpPort - , ptpSock_ - , error ); + retryCount++; + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d - retrying IO (%d) to node %s\n" + , method_name, __LINE__ + , retryCount + , ptpHost ); + } + goto retryIO; } } - - close( ptpSock_ ); TRACE_EXIT; return error; } -int CPtpClient::StdInReq( int nid - , int pid - , StdinReqType type - , int supplierNid - , int supplierPid ) +void CPtpClient::SockClose( int pnid ) { - const char method_name[] = "CPtpClient::StdInReq"; + const char method_name[] = "CPtpClient::SockClose"; TRACE_ENTRY; - if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS)) + if (ptpClusterSocks_[pnid] != -1) { - trace_printf( "%s@%d - Sending InternalType_StdinReq request type =%d " - "from (%d,%d), for supplier (%d,%d)\n" - , method_name, __LINE__ - , type - , nid - , pid - , supplierNid - , supplierPid ); + close( ptpClusterSocks_[pnid] ); + ptpClusterSocks_[pnid] = -1; } - CLNode *lnode = Nodes->GetLNode( supplierNid ); - if (lnode == NULL) - { - char buf[MON_STRING_BUF_SIZE]; - snprintf( buf, sizeof(buf) - , "[%s], Can't find supplier node nid=%d " - "for stdin data request.\n" - , method_name - , supplierNid ); - mon_log_write(PTPCLIENT_STDINREQ_1, SQ_LOG_ERR, buf); + TRACE_EXIT; +} - TRACE_EXIT; - return -1; - } +void CPtpClient::SetLocalHost( void ) +{ + gethostname( ptpHost_, MAX_PROCESSOR_NAME ); +} - CProcess *process = lnode->GetProcessL( supplierPid ); - if (process == NULL) +int CPtpClient::SockReceive(char *buf, int size, int sockFd) +{ + const char method_name[] = "CPtpClient::SockReceive"; + TRACE_ENTRY; + + bool readAgain = false; + int error = 0; + int readCount = 0; + int received = 0; + int sizeCount = size; + + do { - char buf[MON_STRING_BUF_SIZE]; - snprintf( buf, sizeof(buf) - , "[%s], Can't find process nid=%d, " - "pid=%d for stdin data request.\n" - , method_name - , supplierNid - , supplierPid ); - mon_log_write(PTPCLIENT_STDINREQ_2, SQ_LOG_ERR, buf); + readCount = (int) recv( sockFd + , buf + , sizeCount + , 0 ); + if ( readCount > 0 ) Meas.addSockPtpRcvdBytes( readCount ); - TRACE_EXIT; - return -1; + if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf( "%s@%d - Count read %d = recv(%d)\n" + , method_name, __LINE__ + , readCount + , sizeCount ); + } + + if ( readCount > 0 ) + { // Got data + received += readCount; + buf += readCount; + if ( received == size ) + { + readAgain = false; + } + else + { + sizeCount -= readCount; + readAgain = true; + } + } + else if ( readCount == 0 ) + { // EOF + error = ENODATA; + readAgain = false; + } + else + { // Got an error + if ( errno != EINTR) + { + error = errno; + readAgain = false; + } + else + { + readAgain = true; + } + } } + while( readAgain ); - struct internal_msg_def msg; - memset(&msg, 0, sizeof(msg)); - msg.type = InternalType_StdinReq; - msg.u.stdin_req.nid = nid; - msg.u.stdin_req.pid = pid; - msg.u.stdin_req.reqType = type; - msg.u.stdin_req.supplier_nid = supplierNid; - msg.u.stdin_req.supplier_pid = supplierPid; - - int size = offsetof(struct internal_msg_def, u); - size += sizeof(msg.u.stdin_req); - - if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS_DETAIL)) + if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) { - trace_printf( "%s@%d - size_=%d, type =%d " - "from (%d,%d), for supplier (%d,%d)\n" + trace_printf( "%s@%d - recv(), received=%d, error=%d(%s)\n" , method_name, __LINE__ - , size - , msg.u.stdin_req.reqType - , msg.u.stdin_req.nid - , msg.u.stdin_req.pid - , msg.u.stdin_req.supplier_nid - , msg.u.stdin_req.supplier_pid ); + , received + , error, strerror(error) ); } - int error = SendToMon("stdin" - , &msg - , size - , process->GetNid() - , lnode->GetNode()->GetName()); - TRACE_EXIT; return error; } -int CPtpClient::StdIoData( int nid - , int pid - , StdIoType type - , ssize_t count - , char *data ) +int CPtpClient::SockSend(char *buf, int size, int sockFd) { - const char method_name[] = "CPtpClient::StdIoData"; + const char method_name[] = "CPtpClient::SockSend"; TRACE_ENTRY; - - if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS)) - { - trace_printf( "%s@%d - Sending InternalType_IoData request type =%d " - "to (%d,%d), count=%ld\n" - , method_name, __LINE__ - , type - , nid - , pid - , count ); - } - - CLNode *lnode = Nodes->GetLNode( nid ); - if (lnode == NULL) - { - char buf[MON_STRING_BUF_SIZE]; - snprintf( buf, sizeof(buf) - , "[%s], Can't find supplier node nid=%d " - "for stdin data request.\n" - , method_name - , nid ); - mon_log_write(PTPCLIENT_STDIODATA_1, SQ_LOG_ERR, buf); - - TRACE_EXIT; - return -1; - } - - CProcess *process = lnode->GetProcessL( pid ); - if (process == NULL) + + bool sendAgain = false; + int error = 0; + int sendCount = 0; + int sent = 0; + + do { - char buf[MON_STRING_BUF_SIZE]; - snprintf( buf, sizeof(buf) - , "[%s], Can't find process nid=%d, " - "pid=%d for stdin data request.\n" - , method_name - , nid - , pid ); - mon_log_write(PTPCLIENT_STDIODATA_2, SQ_LOG_ERR, buf); + sendCount = (int) send( sockFd + , buf + , size + , 0 ); + if ( sendCount > 0 ) Meas.addSockPtpSentBytes( sendCount ); - TRACE_EXIT; - return -1; + if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf( "%s@%d - send(), sendCount=%d\n" + , method_name, __LINE__ + , sendCount ); + } + + if ( sendCount > 0 ) + { // Sent data + sent += sendCount; + if ( sendCount == size ) + { + sendAgain = false; + } + else + { + sendAgain = true; + } + } + else + { // Got an error + if ( errno != EINTR) + { + error = errno; + sendAgain = false; + } + else + { + sendAgain = true; + } + } } + while( sendAgain ); - struct internal_msg_def msg; - memset(&msg, 0, sizeof(msg)); - msg.type = InternalType_IoData; - msg.u.iodata.nid = nid ; - msg.u.iodata.pid = pid ; - msg.u.iodata.ioType = type ; - msg.u.iodata.length = count; - memcpy(&msg.u.iodata.data, data, count); - - int size = offsetof(struct internal_msg_def, u); - size += sizeof(msg.u.iodata); - - if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS_DETAIL)) + if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) { - trace_printf( "%s@%d - size_=%d, type =%d " - "to (%d,%d), count=%d\n(%s)" + trace_printf( "%s@%d - send(), sent=%d, error=%d(%s)\n" , method_name, __LINE__ - , size - , msg.u.iodata.ioType - , msg.u.iodata.nid - , msg.u.iodata.pid - , msg.u.iodata.length - , msg.u.iodata.length?msg.u.iodata.data:"\n" ); + , sent + , error, strerror(error) ); } - int error = SendToMon("stdio-data" - , &msg - , size - , process->GetNid() - , lnode->GetNode()->GetName()); - TRACE_EXIT; return error; } http://git-wip-us.apache.org/repos/asf/trafodion/blob/32fe8565/core/sqf/monitor/linux/ptpclient.h ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/ptpclient.h b/core/sqf/monitor/linux/ptpclient.h index e6ddeb4..5239c78 100644 --- a/core/sqf/monitor/linux/ptpclient.h +++ b/core/sqf/monitor/linux/ptpclient.h @@ -40,58 +40,66 @@ public: CPtpClient( void ); virtual ~CPtpClient( void ); - int AddUniqStr( int nid - , int id - , const char *stringValue - , int targetNid - , const char *targetNodeName ); - int InitializePtpClient( char * ptpPort ); - int ProcessClone( CProcess *process ); + int InitializePtpClient( int pnid, char* ptpPort ); + int ProcessAddUniqStr( int nid + , int id + , const char* stringValue + , int targetNid + , const char* targetNodeName ); + int ProcessClone( CProcess* process ); int ProcessExit( CProcess* process , int parentNid - , const char *targetNodeName ); - int ProcessInit( CProcess *process - , void *tag + , const char* targetNodeName ); + int ProcessInit( CProcess* process + , void* tag , int result , int parentNid ); int ProcessKill( CProcess* process , bool abort , int targetNid - , const char *targetNodeName ); + , const char* targetNodeName ); int ProcessNew( CProcess* process , int targetNid - , const char *targetNodeName ); + , const char* targetNodeName ); int ProcessNotify( int nid , int pid , Verifier_t verifier , _TM_Txid_External transId , bool canceled - , CProcess *targetProcess + , CProcess* targetProcess , int targetNid - , const char *targetNodeName ); - int StdInReq( int nid - , int pid - , StdinReqType type - , int supplierNid - , int supplierPid ); - int StdIoData( int nid - , int pid - , StdIoType type - , ssize_t count - , char *data ); + , const char* targetNodeName ); + int ProcessStdInReq( int nid + , int pid + , StdinReqType type + , int supplierNid + , int supplierPid ); + int ProcessStdIoData( int nid + , int pid + , StdIoType type + , ssize_t count + , char* data ); private: - int basePort_; + int ptpCommPort_; char ptpHost_[MAX_PROCESSOR_NAME]; char ptpPortBase_[MAX_PROCESSOR_NAME+100]; - int ptpSock_; + int *ptpClusterSocks_; int seqNum_; - int ReceiveSock(char *buf, int size, int sockFd); - int SendSock(char *buf, int size, int sockFd); - int SendToMon(const char *reqType, internal_msg_def *msg, int size, int receiveNode, const char *hostName); + bool IsTargetRemote( int targetNid ); + int SendToMon( const char* reqType + , internal_msg_def* msg + , ptpMsgInfo_t &myInfo + , int receiveNode + , const char* hostName); void SetLocalHost( void ); + void SockClose( int pnid ); + int SockReceive(char* buf, int size, int sockFd); + int SockSend( char* buf + , int size + , int sockFd); }; #endif http://git-wip-us.apache.org/repos/asf/trafodion/blob/32fe8565/core/sqf/monitor/linux/ptpcommaccept.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/ptpcommaccept.cxx b/core/sqf/monitor/linux/ptpcommaccept.cxx index d380d3a..15933dd 100644 --- a/core/sqf/monitor/linux/ptpcommaccept.cxx +++ b/core/sqf/monitor/linux/ptpcommaccept.cxx @@ -47,6 +47,7 @@ extern char *ErrorMsg (int error_code); extern const char *StateString( STATE state); extern CommType_t CommType; +static void *ptpProcess( void *arg ); CPtpCommAccept::CPtpCommAccept() : accepting_(true) @@ -71,156 +72,206 @@ void CPtpCommAccept::processNewSock( int sockFd ) { const char method_name[] = "CPtpCommAccept::processNewSock"; TRACE_ENTRY; - - struct internal_msg_def msg; + int rc; - - mem_log_write(CMonLog::MON_CONNTONEWMON_2); - int size; - rc = Monitor->ReceiveSock( (char *) &size, sizeof(size), sockFd, method_name ); - if ( rc ) - { // Handle error - close( sockFd ); - char buf[MON_STRING_BUF_SIZE]; - snprintf(buf, sizeof(buf), "[%s], unable to obtain node id from new " - "monitor: %s.\n", method_name, ErrorMsg(rc)); - mon_log_write(PTP_COMMACCEPT_1, SQ_LOG_ERR, buf); - return; - } - // Get info about connecting monitor - rc = Monitor->ReceiveSock( (char *) &msg - , size - , sockFd - , method_name ); - - if ( rc ) - { // Handle error - close( sockFd ); + mem_log_write(CMonLog::MON_CONNTONEWMON_1); + + // need to create context in case back-to-back accept is too fast + Context *ctx = new Context(); + ctx->this_ = this; + ctx->pendingFd_ = sockFd; + rc = pthread_create(&process_thread_id_, NULL, ptpProcess, ctx); + if (rc != 0) + { char buf[MON_STRING_BUF_SIZE]; - snprintf(buf, sizeof(buf), "[%s], unable to obtain node id from new " - "monitor: %s.\n", method_name, ErrorMsg(rc)); - mon_log_write(PTP_COMMACCEPT_2, SQ_LOG_ERR, buf); - return; + snprintf(buf, sizeof(buf), "[%s], ptpProcess thread create error=%d\n", + method_name, rc); + mon_log_write(PTP_COMMACCEPT_1, SQ_LOG_ERR, buf); } - else + + TRACE_EXIT; +} + +void CPtpCommAccept::processMonReqs( int sockFd ) +{ + const char method_name[] = "CPtpCommAccept::processMonReqs"; + TRACE_ENTRY; + + int rc; + struct internal_msg_def msg; + + while ( true ) { - switch ( msg.type ) + mem_log_write(CMonLog::MON_CONNTONEWMON_2); + ptpMsgInfo_t remoteInfo; + + // Get info about connecting monitor + rc = Monitor->ReceiveSock( (char *) &remoteInfo + , sizeof(ptpMsgInfo_t) + , sockFd + , method_name ); + if ( rc ) + { // Handle error + char buf[MON_STRING_BUF_SIZE]; + snprintf(buf, sizeof(buf), "[%s], unable to obtain message size and pnid " + "from remote monitor: %s.\n", method_name, ErrorMsg(rc)); + mon_log_write(PTP_COMMACCEPT_2, SQ_LOG_ERR, buf); + return; + } + + // Get info about connecting monitor + rc = Monitor->ReceiveSock( (char *) &msg + , remoteInfo.size + , sockFd + , method_name ); + if ( rc ) + { // Handle error + char buf[MON_STRING_BUF_SIZE]; + CNode *node = Nodes->GetNode(remoteInfo.pnid); + snprintf( buf, sizeof(buf) + , "[%s], unable to obtain message size (%d) from remote " + "monitor %d(%s), error: %s.\n" + , method_name + , remoteInfo.size + , remoteInfo.pnid + , node ? node->GetName() : "" + , ErrorMsg(rc)); + mon_log_write(PTP_COMMACCEPT_3, SQ_LOG_ERR, buf); + return; + } + else { - case InternalType_UniqStr: + switch ( msg.type ) { - if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + case InternalType_UniqStr: { - trace_printf( "%s@%d" " - Received InternalType_UniqStr\n" - , method_name, __LINE__ ); + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d" " - Received InternalType_UniqStr\n" + , method_name, __LINE__ ); + } + ReqQueue.enqueueUniqStrReq( &msg.u.uniqstr); + break; } - ReqQueue.enqueueUniqStrReq( &msg.u.uniqstr); - break; - } - case InternalType_Process: - { - if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + case InternalType_Process: { - trace_printf( "%s@%d" " - Received InternalType_Process\n" - , method_name, __LINE__ ); + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d" " - Received InternalType_Process\n" + , method_name, __LINE__ ); + } + ReqQueue.enqueueNewProcReq( &msg.u.process); + break; } - ReqQueue.enqueueNewProcReq( &msg.u.process); - break; - } - case InternalType_ProcessInit: - { - if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + case InternalType_ProcessInit: { - trace_printf( "%s@%d" " - Received InternalType_ProcessInit\n" - , method_name, __LINE__ ); - } - if ( MyNode->IsMyNode(msg.u.processInit.origNid) ) - { // New process request originated on this node - ReqQueue.enqueueProcInitReq( &msg.u.processInit); + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d" " - Received InternalType_ProcessInit\n" + , method_name, __LINE__ ); + } + if ( MyNode->IsMyNode(msg.u.processInit.origNid) ) + { // New process request originated on this node + ReqQueue.enqueueProcInitReq( &msg.u.processInit); + } + else + { + abort(); + } + break; } - else + case InternalType_Clone: { - abort(); + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d" " - Received InternalType_Clone\n" + , method_name, __LINE__ ); + } + ReqQueue.enqueueCloneReq( &msg.u.clone ); + break; } - break; - } - case InternalType_Clone: - { - if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + case InternalType_Open: { - trace_printf( "%s@%d" " - Received InternalType_Clone\n" - , method_name, __LINE__ ); + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d" " - Received InternalType_Open\n" + , method_name, __LINE__ ); + } + ReqQueue.enqueueOpenReq( &msg.u.open ); + break; } - ReqQueue.enqueueCloneReq( &msg.u.clone ); - break; - } - case InternalType_Open: - { - if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + case InternalType_Notify: { - trace_printf( "%s@%d" " - Received InternalType_Open\n" - , method_name, __LINE__ ); + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d" " - Received InternalType_Notify\n" + , method_name, __LINE__ ); + } + ReqQueue.enqueueNotifyReq( &msg.u.notify ); + break; } - ReqQueue.enqueueOpenReq( &msg.u.open ); - break; - } - case InternalType_Notify: - { - if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + case InternalType_Exit: { - trace_printf( "%s@%d" " - Received InternalType_Notify\n" - , method_name, __LINE__ ); + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d" " - Received InternalType_Exit\n" + , method_name, __LINE__ ); + } + ReqQueue.enqueueExitReq( &msg.u.exit ); + break; } - ReqQueue.enqueueNotifyReq( &msg.u.notify ); - break; - } - case InternalType_Exit: - { - if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + case InternalType_Kill: { - trace_printf( "%s@%d" " - Received InternalType_Exit\n" - , method_name, __LINE__ ); + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d" " - Received InternalType_Kill\n" + , method_name, __LINE__ ); + } + ReqQueue.enqueueKillReq( &msg.u.kill ); + break; } - ReqQueue.enqueueExitReq( &msg.u.exit ); - break; - } - case InternalType_Kill: - { - if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + case InternalType_IoData: { - trace_printf( "%s@%d" " - Received InternalType_Kill\n" - , method_name, __LINE__ ); + if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS)) + { + trace_printf( "%s@%d" " - Received InternalType_IoData\n" + , method_name, __LINE__ ); + } + ReqQueue.enqueueIoDataReq( &msg.u.iodata ); + break; } - ReqQueue.enqueueKillReq( &msg.u.kill ); - break; - } - case InternalType_IoData: - { - if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS)) + case InternalType_StdinReq: { - trace_printf( "%s@%d" " - Received InternalType_IoData\n" - , method_name, __LINE__ ); + if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS)) + { + trace_printf( "%s@%d" " - Received InternalType_StdinReq\n" + , method_name, __LINE__ ); + } + ReqQueue.enqueueStdInReq( &msg.u.stdin_req ); + break; } - ReqQueue.enqueueIoDataReq( &msg.u.iodata ); - break; - } - case InternalType_StdinReq: - { - if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS)) + default: { - trace_printf( "%s@%d" " - Received InternalType_StdinReq\n" - , method_name, __LINE__ ); + char buf[MON_STRING_BUF_SIZE]; + CNode *node = Nodes->GetNode(remoteInfo.pnid); + snprintf( buf, sizeof(buf) + , "[%s], Invalid msg.type: %d, msg size=%d, " + "remote monitor %d(%s)\n" + , method_name + , msg.type + , remoteInfo.size + , remoteInfo.pnid + , node ? node->GetName() : "" ); + mon_log_write(PTP_COMMACCEPT_4, SQ_LOG_ERR, buf); + abort(); } - ReqQueue.enqueueStdInReq( &msg.u.stdin_req ); - break; - } - default: - { - abort(); } } } + close( sockFd ); + TRACE_EXIT; } @@ -285,7 +336,7 @@ void CPtpCommAccept::commAcceptorSock() continue; // Ok to accept another connection } } - + if (shutdown_) { // We are being notified to exit. break; @@ -296,12 +347,12 @@ void CPtpCommAccept::commAcceptorSock() char buf[MON_STRING_BUF_SIZE]; snprintf(buf, sizeof(buf), "[%s], cannot accept new monitor: %s.\n", method_name, strerror(errno)); - mon_log_write(PTP_COMMACCEPT_6, SQ_LOG_ERR, buf); + mon_log_write(PTP_COMMACCEPT_5, SQ_LOG_ERR, buf); } else { processNewSock( sockFd ); - close( sockFd ); + //close( sockFd ); } } @@ -334,7 +385,7 @@ void CPtpCommAccept::shutdownWork(void) TRACE_EXIT; } -// Initialize PtpCommAcceptor thread +// Initialize ptpCommAcceptor thread static void *ptpCommAccept(void *arg) { const char method_name[] = "ptpCommAccept"; @@ -353,7 +404,7 @@ static void *ptpCommAccept(void *arg) char buf[MON_STRING_BUF_SIZE]; snprintf(buf, sizeof(buf), "[%s], pthread_sigmask error=%d\n", method_name, rc); - mon_log_write(PTP_COMMACCEPT_7, SQ_LOG_ERR, buf); + mon_log_write(PTP_COMMACCEPT_6, SQ_LOG_ERR, buf); } // Enter thread processing loop @@ -364,7 +415,38 @@ static void *ptpCommAccept(void *arg) } -// Create a commAcceptor thread +// Initialize ptpProcess thread +static void *ptpProcess(void *arg) +{ + const char method_name[] = "ptpProcess"; + TRACE_ENTRY; + + // Parameter passed to the thread is an context + CPtpCommAccept::Context *ctx = (CPtpCommAccept::Context *) arg; + CPtpCommAccept *cao = ctx->this_; + + // Mask all allowed signals + sigset_t mask; + sigfillset(&mask); + sigdelset(&mask, SIGPROF); // allows profiling such as google profiler + int rc = pthread_sigmask(SIG_SETMASK, &mask, NULL); + if (rc != 0) + { + char buf[MON_STRING_BUF_SIZE]; + snprintf(buf, sizeof(buf), "[%s], pthread_sigmask error=%d\n", + method_name, rc); + mon_log_write(PTP_COMMACCEPT_7, SQ_LOG_ERR, buf); + } + + // Enter thread processing loop + cao->processMonReqs(ctx->pendingFd_); + delete ctx; + + TRACE_EXIT; + return NULL; +} + +// Create a ptpCommAccept thread void CPtpCommAccept::start() { const char method_name[] = "CPtpCommAccept::start"; http://git-wip-us.apache.org/repos/asf/trafodion/blob/32fe8565/core/sqf/monitor/linux/ptpcommaccept.h ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/ptpcommaccept.h b/core/sqf/monitor/linux/ptpcommaccept.h index ca58139..78e9fe0 100644 --- a/core/sqf/monitor/linux/ptpcommaccept.h +++ b/core/sqf/monitor/linux/ptpcommaccept.h @@ -41,12 +41,19 @@ public: bool isAccepting( void ) { CAutoLock lock(getLocker()); return( accepting_ ); } void monReqExec( void *req ); //stupid compiler and circular header files + void processMonReqs( int sockFd ); void processNewSock( int sockFd ); void startAccepting( void ); void stopAccepting( void ); void start( void ); void shutdownWork( void ); + typedef struct + { + CPtpCommAccept *this_; + int pendingFd_; + } Context; + private: void commAcceptorSock( void ); @@ -54,9 +61,10 @@ private: bool accepting_; bool shutdown_; - // commAccept thread's id + // ptpCommAccept thread's id pthread_t thread_id_; - + // ptpProcess thread's id + pthread_t process_thread_id_; }; #endif http://git-wip-us.apache.org/repos/asf/trafodion/blob/32fe8565/core/sqf/monitor/linux/redirector.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/redirector.cxx b/core/sqf/monitor/linux/redirector.cxx index 43bb231..70e8f9c 100644 --- a/core/sqf/monitor/linux/redirector.cxx +++ b/core/sqf/monitor/linux/redirector.cxx @@ -564,7 +564,10 @@ CRedirectAncestorStdin::~CRedirectAncestorStdin() TRACE_ENTRY; // Delete pending buffer (if any) - delete buffer_; + if (buffer_) + { + delete [] buffer_; + } // Delete queued data (if any) while (!ioDataList_.empty()) @@ -572,7 +575,7 @@ CRedirectAncestorStdin::~CRedirectAncestorStdin() // Get first data buffer from list buffer_ = ioDataList_.front(); ioDataList_.pop_front(); - delete buffer_; + delete [] buffer_; } // Alter eyecatcher sequence as a debugging aid to identify deleted object @@ -646,7 +649,7 @@ int CRedirectAncestorStdin::handleInput() retVal = -1; bufferPos_ = 0; - delete buffer_; + delete [] buffer_; buffer_ = NULL; reqType = STDIN_FLOW_ON; @@ -659,7 +662,7 @@ int CRedirectAncestorStdin::handleInput() else { // Have written all data, will need to get more. bufferPos_ = 0; - delete buffer_; + delete [] buffer_; buffer_ = NULL; reqType = STDIN_FLOW_ON; @@ -667,11 +670,11 @@ int CRedirectAncestorStdin::handleInput() if (NameServerEnabled) { - PtpClient->StdInReq( MyPNID - , pid_ - , reqType - , ancestorNid_ - , ancestorPid_ ); + PtpClient->ProcessStdInReq( MyPNID + , pid_ + , reqType + , ancestorNid_ + , ancestorPid_ ); } else { @@ -792,7 +795,7 @@ CRedirectStdinRemote::CRedirectStdinRemote(const char *filename, char buf[MON_STRING_BUF_SIZE]; sprintf(buf, "[%s], %s is an unsupported file type.\n", method_name, filename); - mon_log_write(MON_REDIR_STDINREMOTE_2, SQ_LOG_ERR, buf); + mon_log_write(MON_REDIR_STDINREMOTE_2, SQ_LOG_INFO, buf); close(fd_); fd_ = -1; @@ -874,11 +877,11 @@ void CRedirectStdinRemote::handleOutput(ssize_t count, char *buffer) if (NameServerEnabled) { - PtpClient->StdIoData( requesterNid_ - , pid_ - , STDIN_DATA - , count - , buffer ); + PtpClient->ProcessStdIoData( requesterNid_ + , pid_ + , STDIN_DATA + , count + , buffer ); } else { @@ -1177,11 +1180,11 @@ void CRedirectAncestorStdout::handleOutput(ssize_t count, char *buffer) if (NameServerEnabled) { - PtpClient->StdIoData( ancestor_nid_ - , ancestor_pid_ - , STDOUT_DATA - , count - , buffer ); + PtpClient->ProcessStdIoData( ancestor_nid_ + , ancestor_pid_ + , STDOUT_DATA + , count + , buffer ); } else { @@ -1654,11 +1657,11 @@ void CRedirector::stdinFd(int nid, int pid, int &pipeFd, char filename[], if (NameServerEnabled) { - PtpClient->StdInReq( nid - , pid - , STDIN_REQ_DATA - , ancestor_nid - , ancestor_pid ); + PtpClient->ProcessStdInReq( nid + , pid + , STDIN_REQ_DATA + , ancestor_nid + , ancestor_pid ); } else { http://git-wip-us.apache.org/repos/asf/trafodion/blob/32fe8565/core/sqf/monitor/linux/reqdump.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/reqdump.cxx b/core/sqf/monitor/linux/reqdump.cxx index 5d2dd5e..fda3cea 100644 --- a/core/sqf/monitor/linux/reqdump.cxx +++ b/core/sqf/monitor/linux/reqdump.cxx @@ -129,8 +129,11 @@ void CExtDumpReq::performRequest() { if ( target_process_name.size() ) { // find by name - targetProcess = Nodes->GetProcess( target_process_name.c_str() - , target_verifier ); + if (msg_->u.request.u.dump.target_process_name[0] == '$' ) + { + targetProcess = Nodes->GetProcess( target_process_name.c_str() + , target_verifier ); + } } else { // find by nid, pid @@ -152,9 +155,12 @@ void CExtDumpReq::performRequest() , target_process_name.c_str() , target_verifier ); } - cloneProcess = Nodes->CloneProcessNs( target_process_name.c_str() - , target_verifier ); - targetProcess = cloneProcess; + if (msg_->u.request.u.dump.target_process_name[0] == '$' ) + { + cloneProcess = Nodes->CloneProcessNs( target_process_name.c_str() + , target_verifier ); + targetProcess = cloneProcess; + } } else { // Name Server find by nid,pid:verifier http://git-wip-us.apache.org/repos/asf/trafodion/blob/32fe8565/core/sqf/monitor/linux/reqevent.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/reqevent.cxx b/core/sqf/monitor/linux/reqevent.cxx index 01c9067..f86582d 100644 --- a/core/sqf/monitor/linux/reqevent.cxx +++ b/core/sqf/monitor/linux/reqevent.cxx @@ -163,8 +163,11 @@ void CExtEventReq::performRequest() if ( target_process_name.size() ) { // find by name - targetProcess = Nodes->GetProcess( target_process_name.c_str() - , target_verifier ); + if (msg_->u.request.u.event.target_process_name[0] == '$' ) + { + targetProcess = Nodes->GetProcess( target_process_name.c_str() + , target_verifier ); + } if ( !targetProcess ) { if (NameServerEnabled) @@ -176,9 +179,12 @@ void CExtEventReq::performRequest() , target_process_name.c_str() , target_verifier ); } - cloneProcess = Nodes->CloneProcessNs( target_process_name.c_str() - , target_verifier ); - targetProcess = cloneProcess; + if (msg_->u.request.u.event.target_process_name[0] == '$' ) + { + cloneProcess = Nodes->CloneProcessNs( target_process_name.c_str() + , target_verifier ); + targetProcess = cloneProcess; + } } } if ( targetProcess && trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) http://git-wip-us.apache.org/repos/asf/trafodion/blob/32fe8565/core/sqf/monitor/linux/reqkill.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/reqkill.cxx b/core/sqf/monitor/linux/reqkill.cxx index b59cae2..a7f7b62 100644 --- a/core/sqf/monitor/linux/reqkill.cxx +++ b/core/sqf/monitor/linux/reqkill.cxx @@ -211,14 +211,17 @@ void CExtKillReq::performRequest() { if ( target_process_name.size() ) { // find by name (check node state, don't check process state, not backup) - targetProcess = Nodes->GetProcess( target_process_name.c_str() - , target_verifier - , true, false, false ); - if ( targetProcess && - (msg_->u.request.u.kill.target_nid == -1 || - msg_->u.request.u.kill.target_pid == -1)) + if (msg_->u.request.u.kill.target_process_name[0] == '$' ) { - backup = targetProcess->GetBackup (); + targetProcess = Nodes->GetProcess( target_process_name.c_str() + , target_verifier + , true, false, false ); + if ( targetProcess && + (msg_->u.request.u.kill.target_nid == -1 || + msg_->u.request.u.kill.target_pid == -1)) + { + backup = targetProcess->GetBackup (); + } } } else @@ -256,9 +259,12 @@ void CExtKillReq::performRequest() , target_process_name.c_str() , target_verifier ); } - cloneProcess = Nodes->CloneProcessNs( target_process_name.c_str() - , target_verifier ); - targetProcess = cloneProcess; + if (msg_->u.request.u.kill.target_process_name[0] == '$' ) + { + cloneProcess = Nodes->CloneProcessNs( target_process_name.c_str() + , target_verifier ); + targetProcess = cloneProcess; + } } else { // Name Server find by nid,pid:verifier http://git-wip-us.apache.org/repos/asf/trafodion/blob/32fe8565/core/sqf/monitor/linux/reqnotify.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/reqnotify.cxx b/core/sqf/monitor/linux/reqnotify.cxx index 4d278ce..5e69681 100644 --- a/core/sqf/monitor/linux/reqnotify.cxx +++ b/core/sqf/monitor/linux/reqnotify.cxx @@ -180,9 +180,12 @@ void CExtNotifyReq::performRequest() , target_process_name.c_str() , target_verifier ); } - targetProcess = Nodes->GetProcess( target_process_name.c_str() - , target_verifier - , true, false, false ); + if (msg_->u.request.u.notify.target_process_name[0] == '$' ) + { + targetProcess = Nodes->GetProcess( target_process_name.c_str() + , target_verifier + , true, false, false ); + } } else { // find by nid (check node state, don't check process state, backup is Ok) @@ -226,8 +229,11 @@ void CExtNotifyReq::performRequest() , target_process_name.c_str() , target_verifier ); } - targetProcess = Nodes->CloneProcessNs( target_process_name.c_str() - , target_verifier ); + if (msg_->u.request.u.notify.target_process_name[0] == '$' ) + { + targetProcess = Nodes->CloneProcessNs( target_process_name.c_str() + , target_verifier ); + } } else { // Name Server find by nid,pid:verifier @@ -319,7 +325,7 @@ void CExtNotifyReq::performRequest() { if (trace_settings & TRACE_REQUEST) { - trace_printf("%s@%d" " - Can't find targerProcess" "\n", method_name, __LINE__); + trace_printf("%s@%d" " - Can't find targetProcess" "\n", method_name, __LINE__); } } }