http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/monitor/linux/process.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/process.cxx b/core/sqf/monitor/linux/process.cxx index fffd750..02d6276 100644 --- a/core/sqf/monitor/linux/process.cxx +++ b/core/sqf/monitor/linux/process.cxx @@ -203,6 +203,7 @@ CProcess::CProcess (CProcess * parent, int nid, int pid, , NoticeTail(NULL) #endif #ifdef NAMESERVER_PROCESS + , monSockFd_(-1) , origPNidNs_(-1) #endif { @@ -477,6 +478,8 @@ bool CProcess::CancelDeathNotification( int nid while( notice ) { + // This process no longer wants to be notified of death of process + // identified by nid, pid, verifier if ((( notice->Nid == nid ) && ( notice->Pid == pid ) && ( notice->verifier_ == verifier ) && @@ -593,13 +596,33 @@ void CProcess::procExitUnregAll ( _TM_Txid_External transId ) { node = Nodes->GetLNode ( it->nid ); targetProcess = NULL; - if (node) + if (node) { targetProcess = node->GetProcessL( it->pid ); } if ( targetProcess ) { + if (NameServerEnabled && targetProcess->IsClone()) + { + CLNode *targetLNode = Nodes->GetLNode( targetProcess->GetNid() ); + + int rc = -1; + // Forward the process cancel death notification to the target node + rc = PtpClient->ProcessNotify( targetProcess->GetNid() + , targetProcess->GetPid() + , targetProcess->GetVerifier() + , transId + , true // cancel target's death notification + , this // of this process + , targetLNode->GetNid() + , targetLNode->GetNode()->GetName() ); + if (rc) + { + // TODO: Error handling + } + } + targetProcess->CancelDeathNotification( Nid , Pid , Verifier @@ -626,6 +649,18 @@ void CProcess::childAdd ( int nid, int pid ) TRACE_EXIT; } +int CProcess::childCount ( void ) +{ + const char method_name[] = "CProcess::childCount"; + TRACE_ENTRY; + + childrenListLock_.lock(); + int count = children_.size(); + childrenListLock_.unlock(); + + TRACE_EXIT; + return(count); +} void CProcess::childRemove ( int nid, int pid ) { @@ -732,9 +767,6 @@ void CProcess::CompleteProcessStartup (char *port, int os_pid, bool event_messag Pid = os_pid; Event_messages = event_messages; System_messages = system_messages; -#ifdef NAMESERVER_PROCESS - origPNidNs_ = origPNidNs; -#endif if (preclone) { @@ -754,12 +786,39 @@ void CProcess::CompleteProcessStartup (char *port, int os_pid, bool event_messag if ( MyNode->IsMyNode(Nid) ) { if ( NameServerEnabled ) - NameServer->ProcessNew(this); // in reqQueue thread (CExtStartupReq) + { + int rc = -1; + // Register process in Name Server + rc = NameServer->ProcessNew(this); // in reqQueue thread (CExtStartupReq) + if (rc) + { + // TODO: Error handling + } -//TRK-TODO - ?? - // Replicate the clone to other nodes - CReplClone *repl = new CReplClone(this); - Replicator.addItem(repl); + if (Parent_Nid != -1) + { + if (Parent_Nid != Nid) + { + // Tell the parent node the current state of the process + rc = PtpClient->ProcessClone(this); + if (rc) + { + // TODO: Error handling + } + } + else + { + // TODO: Generate internal clone request? + // to update local parent? + } + } + } + else + { + // Replicate the clone to other nodes + CReplClone *repl = new CReplClone(this); + Replicator.addItem(repl); + } } else { @@ -768,9 +827,34 @@ void CProcess::CompleteProcessStartup (char *port, int os_pid, bool event_messag } else { - // Replicate the clone to other nodes - CReplClone *repl = new CReplClone(this); - Replicator.addItem(repl); + // TODO: What does an os_pid == -1 mean? + if ( NameServerEnabled ) + { + if (Parent_Nid != -1) + { + if (Parent_Nid != Nid) + { + int rc = -1; + // Tell the parent node the current state of the process + rc = PtpClient->ProcessClone(this); + if (rc) + { + // TODO: Error handling + } + } + else + { + // TODO: Generate internal clone request? + // to update local parent? + } + } + } + else + { + // Replicate the clone to other nodes + CReplClone *repl = new CReplClone(this); + Replicator.addItem(repl); + } } } @@ -2797,6 +2881,15 @@ void CProcess::Exit( CProcess *parent ) } #endif + if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) + trace_printf( "%s@%d" " - Process %s (%d,%d:%d) is exiting, parent process %s (%d,%d:%d)\n" + , method_name, __LINE__ + , GetName(), GetNid(), GetPid(), GetVerifier() + , parent?parent->GetName():"" + , parent?parent->GetNid():-1 + , parent?parent->GetPid():-1 + , parent?parent->GetVerifier():-1 ); + SetState(State_Stopped); // if the env is set to not deliver death messages upon node down, @@ -2817,7 +2910,12 @@ void CProcess::Exit( CProcess *parent ) !(Type == ProcessType_DTM && IsAbended()) && supplyProcessDeathNotices ) { - // Notify all registered processes of this process' death + if ( !Clone && NameServerEnabled ) + { + // Notify all remote registered nodes of this process' death + NoticeHead->NotifyRemote(); + } + // Notify all local registered processes of this process' death NoticeHead->NotifyAll(); } #endif @@ -3120,6 +3218,55 @@ void CProcess::Exit( CProcess *parent ) trace_printf("%s@%d" " - Parent doesn't want Death message" "\n", method_name, __LINE__); } } + +#ifndef NAMESERVER_PROCESS + if (NameServerEnabled) + { + if ( parent && parent->IsClone() && Pid != -1 ) + { + int targetNid = parent->GetNid(); + CLNode *targetLNode = Nodes->GetLNode( targetNid ); + // Send the process exit to the target node + int rc = PtpClient->ProcessExit( this + , targetNid + , targetLNode->GetNode()->GetName() ); + if (rc) + { + // TODO: Error handling + } +#if 0 + // TODO: This is not the correct place. It needs to be found! + // When the parent process is in a remote node and + // the local node contains child processes, + // a clone of the parent is created at child creation time, + // when all child processes are deleted, it leaves the + // parent clone process. Need to determine when all + // child process objects which reference the parent clone + // are deleted so the parent clone object can be deleted. + // The symptom is that shutdown never occurs since there + // are object which have not been deleted and the process + // counts prevent the shutdown from completing. + if (parent->childCount() == 0) + { + if (trace_settings & (TRACE_SYNC_DETAIL | TRACE_REQUEST_DETAIL | TRACE_PROCESS_DETAIL)) + { + trace_printf( "%s@%d" " - Deleting parent %s (%d,%d:%d) of last child %s (%d,%d:%d) \n" + , method_name, __LINE__ + , parent->GetName(), parent->GetNid() + , parent->GetPid(), parent->GetVerifier() + , GetName(), GetNid(), GetPid(), GetVerifier() ); + } + + CNode *parentNode = Nodes->GetLNode(parent->GetNid())->GetNode(); + parentNode->DelFromNameMap( parent ); + parentNode->DelFromPidMap( parent ); + parentNode->DeleteFromList( parent ); + } +#endif + } + } +#endif + TRACE_EXIT; } @@ -3482,6 +3629,7 @@ CNotice *CProcess::RegisterDeathNotification( int nid } #endif +#ifndef NAMESERVER_PROCESS void CProcess::ReplyNewProcess (struct message_def * reply_msg, CProcess * process, int result) { @@ -3505,14 +3653,13 @@ void CProcess::ReplyNewProcess (struct message_def * reply_msg, process->Name, process->Nid, process->Pid, process->Verifier, Name, Nid, Pid, result); -#ifndef NAMESERVER_PROCESS // send reply to the parent SQ_theLocalIOToClient->sendCtlMsg ( Pid, MC_SReady, ((SharedMsgDef*)reply_msg)-> trailer.index ); -#endif TRACE_EXIT; } +#endif #ifndef NAMESERVER_PROCESS @@ -4099,7 +4246,7 @@ void CProcessContainer::AttachProcessCheck ( struct message_def *msg ) msg->u.reply.u.generic.return_code = MPI_ERR_NAME; } } - // check if its a attach request, if so setup the process + // check if its an attach request, if so setup the process else if ((msg->u.request.u.startup.nid == -1) && (msg->u.request.u.startup.pid == -1) ) { @@ -4383,14 +4530,18 @@ bool CProcessContainer::CancelDeathNotification( int nid #endif #ifndef NAMESERVER_PROCESS +// Child_Exit terminates all child processes created by the parent process +// unless the child process is Unhooked from the parent process void CProcessContainer::Child_Exit ( CProcess * parent ) { CProcess *process; const char method_name[] = "CProcessContainer::Child_Exit"; TRACE_ENTRY; - if (trace_settings & TRACE_ENTRY_EXIT) - trace_printf("%s@%d with parent (%d, %d)\n", method_name, __LINE__, parent->GetNid(), parent->GetPid() ); + if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) + trace_printf( "%s@%d for parent (%d, %d:%d)\n" + , method_name, __LINE__ + , parent->GetNid(), parent->GetPid(), parent->GetVerifier() ); if ( parent && ((MyNode->GetState() != State_Shutdown && @@ -4398,26 +4549,26 @@ void CProcessContainer::Child_Exit ( CProcess * parent ) || (parent->GetType() == ProcessType_SPX) ) ) { CProcess::nidPid_t child; - CLNode * childNode; + CLNode * childLNode; while ( parent->childRemoveFirst ( child )) { - childNode = Nodes->GetLNode( child.nid ); - process = (childNode != NULL ) - ? childNode->GetNode()->GetProcess( child.pid ) : NULL; + childLNode = Nodes->GetLNode( child.nid ); + process = (childLNode != NULL ) + ? childLNode->GetNode()->GetProcess( child.pid ) : NULL; if ( process && (!process->IsUnhooked()) ) { if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) - trace_printf("%s@%d - Child process %s (%d, %d) exits due " - "to parent death (%d, %d)\n", + trace_printf("%s@%d - Child process %s (%d, %d:%d) exits due " + "to parent death (%d, %d:%d)\n", method_name, __LINE__, process->GetName(), - process->GetNid(), process->GetPid(), - parent->GetNid(), parent->GetPid()); + process->GetNid(), process->GetPid(), process->GetVerifier(), + parent->GetNid(), parent->GetPid(), parent->GetVerifier()); - childNode->SetProcessState( process, State_Down, true ); + childLNode->SetProcessState( process, State_Down, true ); if ( !process->IsClone() ) { if ( parent->GetType() == ProcessType_SPX ) @@ -4429,6 +4580,19 @@ void CProcessContainer::Child_Exit ( CProcess * parent ) kill (process->GetPid(), Monitor->GetProcTermSig()); } } + else + { + if (NameServerEnabled) + { + CNode *childNode = childLNode->GetNode(); + // Forward the process create to the target node + PtpClient->ProcessKill( process + , process->GetAbort() + , childLNode->GetNid() + , childNode->GetName()); + } + } + if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) trace_printf("%s@%d - Completed kill for child process %s (%d, %d)\n", method_name, __LINE__, process->GetName(), process->GetNid(), process->GetPid()); } @@ -4948,6 +5112,7 @@ void CProcessContainer::DumpCallback( int nid, pid_t pid, int status ) #endif +#ifndef NAMESERVER_PROCESS CProcess * CProcessContainer::ParentNewProcReply ( CProcess *process, int result ) { const char method_name[] = "CProcessContainer::ParentNewProcReply"; @@ -4964,10 +5129,8 @@ CProcess * CProcessContainer::ParentNewProcReply ( CProcess *process, int result // If we have a parent process then it is expecting a reply if (parent && !parent->IsClone() && !parent->IsPaired()) { -#ifndef NAMESERVER_PROCESS if (!process->IsNowait()) { // The new process request was "waited" so send reply now -#endif struct message_def *reply_msg; reply_msg = process->parentContext(); @@ -4979,19 +5142,18 @@ CProcess * CProcessContainer::ParentNewProcReply ( CProcess *process, int result // buffer) is no longer valid. process->parentContext( NULL ); } -#ifndef NAMESERVER_PROCESS } else { // The new process request was "no-wait" so send notice now process->SendProcessCreatedNotice(parent, result); } -#endif } TRACE_EXIT; return parent; } +#endif #ifndef NAMESERVER_PROCESS void CProcessContainer::Exit_Process (CProcess *process, bool abend, int downNode) @@ -5030,6 +5192,7 @@ void CProcessContainer::Exit_Process (CProcess *process, bool abend, int downNod mon_log_write(MON_PROCESSCONT_EXITPROCESS_1, SQ_LOG_ERR, la_buf); abort(); } + if ( process->GetState() == State_Stopped ) { if (trace_settings & (TRACE_SYNC_DETAIL | TRACE_REQUEST_DETAIL | TRACE_PROCESS_DETAIL)) @@ -5072,9 +5235,7 @@ void CProcessContainer::Exit_Process (CProcess *process, bool abend, int downNod // Unregister any interest in other process' death _TM_Txid_External transid; transid = invalid_trans(); -#ifndef NAMESERVER_PROCESS process->procExitUnregAll( transid ); -#endif // Handle the process termination process->Exit( parent ); @@ -5096,14 +5257,9 @@ void CProcessContainer::Exit_Process (CProcess *process, bool abend, int downNod { if (!process->IsClone() && !MyNode->isInQuiesceState()) { - // Replicate the exit to other nodes -//TRK-TODO - // if (NameServerEnabled) - { - //message to monitor - } - // else + if (!NameServerEnabled) { + // Replicate the exit to other nodes CReplExit *repl = new CReplExit(process->GetNid(), process->GetPid(), process->GetVerifier(), @@ -5137,12 +5293,7 @@ void CProcessContainer::Exit_Process (CProcess *process, bool abend, int downNod process->IsAbended() && MyNode->GetShutdownLevel() == ShutdownLevel_Undefined) ) { -//TRK-TODO - // if (NameServerEnabled) - { - //message to monitor - } - // else + if (!NameServerEnabled) { // Replicate the exit to other nodes CReplExit *repl = new CReplExit(process->GetNid(),
http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/monitor/linux/process.h ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/process.h b/core/sqf/monitor/linux/process.h index 074c13d..9445f6e 100644 --- a/core/sqf/monitor/linux/process.h +++ b/core/sqf/monitor/linux/process.h @@ -127,9 +127,11 @@ class CProcessContainer ); bool Dump_Process( CProcess *dumper, CProcess *process, char *core_path ); void DumpCallback( int nid, pid_t pid, int status ); - static CProcess *ParentNewProcReply ( CProcess *process, int result ); #ifndef NAMESERVER_PROCESS + static CProcess *ParentNewProcReply ( CProcess *process, int result ); void Exit_Process( CProcess *process, bool abend, int downNode ); +#else + static CProcess *MonReply ( CProcess *process, int result ); #endif CProcess *GetFirstProcess( void ) { return(head_); }; CProcess *GetLastProcess( void ) { return(tail_); }; @@ -299,6 +301,7 @@ class CProcess inline void SetPairParentVerifier( int verifier ) { PairParentVerifier = verifier; } inline int GetPriority ( ) { return Priority; } inline void SetTag ( long long tag ) { Tag = tag; } + inline int GetReplyTag ( ) { return ReplyTag; } inline void SetReplyTag ( int tag ) { ReplyTag = tag; } inline const char * GetPort ( ) { return Port; } inline PROCESSTYPE GetType ( ) { return Type; } @@ -317,6 +320,8 @@ class CProcess inline int GetDumperVerifier ( ) { return DumperVerifier; } inline const char * GetDumpFile () { return dumpFile_.c_str(); } #ifdef NAMESERVER_PROCESS + inline int GetMonSockFd( ) { return monSockFd_; } + inline void SetMonSockFd( int sockFd ) { monSockFd_ = sockFd; } inline int GetOrigPNidNs( ) { return origPNidNs_; } inline void SetOrigPNidNs( int pnid ) { origPNidNs_ = pnid; } #endif @@ -342,9 +347,9 @@ class CProcess , Verifier_t verifier , const char *name , _TM_Txid_External trans_id ); -#endif void ReplyNewProcess (struct message_def * reply_msg, CProcess * process, int result); +#endif void SendProcessCreatedNotice(CProcess *parent, int result); struct timespec GetCreationTime () { return CreationTime; } void SetCreationTime(int os_pid); @@ -383,13 +388,19 @@ class CProcess inline int decrReplRef() { --replRefCount_; return replRefCount_; } inline int replRefCount() { return replRefCount_; } +#ifndef NAMESERVER_PROCESS void parentContext (struct message_def * msg) { requestBuf_ = msg; } struct message_def * parentContext ( void ) { return requestBuf_; } +#else + void SetMonContext (struct message_def * msg) { requestBuf_ = msg; } + struct message_def * GetMonContext ( void ) { return requestBuf_; } +#endif void SetHangupTime () { clock_gettime(CLOCK_REALTIME, &hangupTime_); } time_t GetHangupTime () { return hangupTime_.tv_sec; } void childAdd ( int nid, int pid ); + int childCount ( void ); void childRemove ( int nid, int pid ); bool childRemoveFirst ( nidPid_t & child ); @@ -545,6 +556,7 @@ private: CNotice *NoticeTail; #endif #ifdef NAMESERVER_PROCESS + int monSockFd_; int origPNidNs_; #endif }; http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/monitor/linux/ptpclient.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/ptpclient.cxx b/core/sqf/monitor/linux/ptpclient.cxx index 1d75cc6..5e1380e 100644 --- a/core/sqf/monitor/linux/ptpclient.cxx +++ b/core/sqf/monitor/linux/ptpclient.cxx @@ -45,31 +45,40 @@ using namespace std; #include "lnode.h" #include "pnode.h" #include "ptpclient.h" +#include "monitor.h" #include "monlogging.h" #include "montrace.h" #include "meas.h" +extern CMonitor *Monitor; extern CNode *MyNode; +extern CNodeContainer *Nodes; extern bool IsRealCluster; extern CMeas Meas; CPtpClient::CPtpClient (void) -: mon2monSock_(0) -, seqNum_(0) + : mon2monSock_(0) + , seqNum_(0) { const char method_name[] = "CPtpClient::CPtpClient"; TRACE_ENTRY; - // revisit - probably can use the one we already read in - char * p = getenv( "MONITOR_COMM_PORT" ); + char * p = getenv( "MON2MON_COMM_PORT" ); if ( p ) { basePort_ = atoi( p ); } else { - basePort_ = 23399; // constant somewhere + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf) + , "[%s@%d] MON2MON_COMM_PORT environment variable is not set!\n" + , method_name, __LINE__ ); + mon_log_write( PTPCLIENT_PTPCLIENT_1, SQ_LOG_CRIT, buf ); + abort(); } + + TRACE_EXIT; } CPtpClient::~CPtpClient (void) @@ -80,166 +89,18 @@ CPtpClient::~CPtpClient (void) TRACE_EXIT; } -int CPtpClient::MkCltSock( const char *portName ) -{ - const char method_name[] = "CPtpClient::MkCltSock"; - TRACE_ENTRY; - - int sock; // socket - int ret; // returned value - int reuse = 1; // sockopt reuse option - socklen_t size; // size of socket address - static int retries = 0; // # times to retry connect - int outer_failures = 0; // # failed connect loops - int connect_failures = 0; // # failed connects - char *p; // getenv results - struct sockaddr_in sockinfo; // socket address info - struct hostent *he; - char host[1000]; - const char *colon; - unsigned int port; - - colon = strstr(portName, ":"); - strcpy(host, portName); - int len = colon - portName; - host[len] = '\0'; - port = atoi(&colon[1]); - - size = sizeof(sockinfo); - - if ( !retries ) - { - p = getenv( "HPMP_CONNECT_RETRIES" ); - if ( p ) retries = atoi( p ); - else retries = 5; - } - - for ( ;; ) - { - sock = socket( AF_INET, SOCK_STREAM, 0 ); - if ( sock < 0 ) - { - char la_buf[MON_STRING_BUF_SIZE]; - int err = errno; - snprintf( la_buf, sizeof(la_buf) - , "[%s], socket() failed! errno=%d (%s)\n" - , method_name, err, strerror( err )); - mon_log_write(MON_CLUSTER_MKCLTSOCK_1, SQ_LOG_ERR, la_buf); - return ( -1 ); - } - - he = gethostbyname( host ); - if ( !he ) - { - char la_buf[MON_STRING_BUF_SIZE]; - int err = h_errno; - snprintf( la_buf, sizeof(la_buf), - "[%s] gethostbyname(%s) failed! errno=%d (%s)\n" - , method_name, host, err, strerror( err )); - mon_log_write(MON_CLUSTER_MKCLTSOCK_2, SQ_LOG_ERR, la_buf); - close( sock ); - return ( -1 ); - } - - // Connect socket. - memset( (char *) &sockinfo, 0, size ); - memcpy( (char *) &sockinfo.sin_addr, (char *) he->h_addr, 4 ); - sockinfo.sin_family = AF_INET; - sockinfo.sin_port = htons( (unsigned short) port ); - - // Note the outer loop uses "retries" from HPMP_CONNECT_RETRIES, - // and has a yield between each retry, since it's more oriented - // toward failures from network overload and putting a pause - // between retries. This inner loop should only iterate when - // a signal interrupts the local process, so it doesn't pause - // or use the same HPMP_CONNECT_RETRIES count. - connect_failures = 0; - ret = 1; - while ( ret != 0 && connect_failures <= 10 ) - { - if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) - { - trace_printf( "%s@%d - Connecting to %s addr=%d.%d.%d.%d, port=%d, connect_failures=%d\n" - , method_name, __LINE__ - , host - , (int)((unsigned char *)he->h_addr)[0] - , (int)((unsigned char *)he->h_addr)[1] - , (int)((unsigned char *)he->h_addr)[2] - , (int)((unsigned char *)he->h_addr)[3] - , port - , connect_failures ); - } - - ret = connect( sock, (struct sockaddr *) &sockinfo, size ); - if ( ret == 0 ) break; - if ( errno == EINTR ) - { - ++connect_failures; - } - else - { - char la_buf[MON_STRING_BUF_SIZE]; - int err = errno; - sprintf( la_buf, "[%s], connect() failed! errno=%d (%s)\n" - , method_name, err, strerror( err )); - mon_log_write(MON_CLUSTER_MKCLTSOCK_3, SQ_LOG_ERR, la_buf); - close(sock); - return ( -1 ); - } - } - - if ( ret == 0 ) break; - - // For large clusters, the connect/accept calls seem to fail occasionally, - // no doubt do to the large number (1000's) of simultaneous connect packets - // flooding the network at once. So, we retry up to HPMP_CONNECT_RETRIES - // number of times. - if ( errno != EINTR ) - { - if ( ++outer_failures > retries ) - { - char la_buf[MON_STRING_BUF_SIZE]; - sprintf( la_buf, "[%s], connect() exceeded retries! count=%d\n" - , method_name, retries); - mon_log_write(MON_CLUSTER_MKCLTSOCK_4, SQ_LOG_ERR, la_buf); - close( sock ); - return ( -1 ); - } - struct timespec req, rem; - req.tv_sec = 0; - req.tv_nsec = 500000; - nanosleep( &req, &rem ); - } - close( sock ); - } - - if ( setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, (char *) &reuse, sizeof(int) ) ) - { - char la_buf[MON_STRING_BUF_SIZE]; - int err = errno; - sprintf( la_buf, "[%s], setsockopt() failed! errno=%d (%s)\n" - , method_name, err, strerror( err )); - mon_log_write(MON_CLUSTER_MKCLTSOCK_5, SQ_LOG_ERR, la_buf); - close( (int)sock ); - return ( -2 ); - } - - TRACE_EXIT; - return ( sock ); -} - int CPtpClient::InitializePtpClient( char * mon2monPort ) { const char method_name[] = "CPtpClient::InitializePtpClient"; TRACE_ENTRY; int err = 0; - int sock = MkCltSock(mon2monPort); + int sock = Monitor->MkCltSock( mon2monPort ); if (sock < 0) { err = sock; - if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) { trace_printf( "%s@%d - MkCltSock failed with error %d\n" , method_name, __LINE__, err ); @@ -248,15 +109,15 @@ int CPtpClient::InitializePtpClient( char * mon2monPort ) else { mon2monSock_ = sock; - if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) { - trace_printf( "%s@%d - connected to nameserver=%s, sock=%d\n" + trace_printf( "%s@%d - connected to monitor node=%s, sock=%d\n" , method_name, __LINE__ , mon2monPort , mon2monSock_ ); } } - +#if 0 // remove if (err == 0) { @@ -290,16 +151,347 @@ int CPtpClient::InitializePtpClient( char * mon2monPort ) } } } - +#endif TRACE_EXIT; return err; } -int CPtpClient::NewProcess(CProcess* process, int receiveNode, const char *hostName) +int CPtpClient::ProcessClone( CProcess *process ) +{ + const char method_name[] = "CPtpClient::ProcessClone"; + TRACE_ENTRY; + + CLNode *parentLNode = NULL; + if (process->GetParentNid() != -1) + { + parentLNode = Nodes->GetLNode( process->GetParentNid() ); + } + + if (parentLNode == NULL) + { + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d - Not Sending InternalType_Clone request to parentNid=%d\n" + ", process=%s (%d:%d:%d)\n" + , method_name, __LINE__ + , process->GetParentNid() + , process->GetName() + , process->GetNid() + , process->GetPid() + , process->GetVerifier() ); + } + return(0); + } + + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d - Sending InternalType_Clone request to %s, parentNid=%d\n" + ", process=%s (%d:%d:%d)\n" + , method_name, __LINE__ + , parentLNode->GetNode()->GetName() + , process->GetParentNid() + , process->GetName() + , process->GetNid() + , process->GetPid() + , process->GetVerifier() ); + } + + struct internal_msg_def msg; + memset(&msg, 0, sizeof(msg)); + msg.type = InternalType_Clone; + msg.u.clone.backup = process->IsBackup(); + msg.u.clone.unhooked = process->IsUnhooked(); + msg.u.clone.event_messages = process->IsEventMessages(); + msg.u.clone.system_messages = process->IsSystemMessages(); + msg.u.clone.nid = process->GetNid(); + msg.u.clone.verifier = process->GetVerifier(); + msg.u.clone.type = process->GetType(); + msg.u.clone.priority = process->GetPriority(); + msg.u.clone.parent_nid = process->GetParentNid(); + msg.u.clone.parent_pid = process->GetParentPid(); + msg.u.clone.parent_verifier = process->GetParentVerifier(); + msg.u.clone.os_pid = process->GetPid(); + msg.u.clone.persistent = process->IsPersistent(); + msg.u.clone.persistent_retries = process->GetPersistentRetries(); + msg.u.clone.origPNidNs= -1; + msg.u.clone.argc = process->argc(); + msg.u.clone.creation_time = process->GetCreationTime(); + msg.u.clone.pathStrId = process->pathStrId(); + msg.u.clone.ldpathStrId = process->ldPathStrId(); + msg.u.clone.programStrId = process->programStrId(); + + msg.u.clone.prior_pid = process->GetPriorPid (); + process->SetPriorPid ( 0 ); + msg.u.clone.creation_time = process->GetCreationTime(); + + char *stringData = & msg.u.clone.stringData; + int nameLen = strlen(process->GetName()) + 1; + int portLen = strlen(process->GetPort()) + 1; + int infileLen = strlen(process->infile()) + 1; + int outfileLen = strlen(process->outfile()) + 1; + int argvLen = process->userArgvLen(); + + // Copy the process name + msg.u.clone.nameLen = nameLen; + memcpy( stringData, process->GetName(), nameLen ); + stringData += nameLen; + + // Copy the port + msg.u.clone.portLen = portLen; + memcpy(stringData, process->GetPort(), portLen ); + stringData += portLen; + + // Copy the standard in file name + msg.u.clone.infileLen = infileLen; + memcpy( stringData, process->infile(), infileLen ); + stringData += infileLen ; + + // Copy the standard out file name + msg.u.clone.outfileLen = outfileLen; + memcpy( stringData, process->outfile(), outfileLen ); + stringData += outfileLen ; + + // Copy the program argument strings + msg.u.clone.argvLen = argvLen; + memcpy( stringData, process->userArgv(), argvLen ); + + int size = offsetof(struct internal_msg_def, u); + size += sizeof(msg.u.clone); + size += nameLen ; + size += portLen ; + size += infileLen ; + size += outfileLen ; + size += argvLen ; + + if (trace_settings & TRACE_PROCESS_DETAIL) + { + trace_printf( "%s@%d - size_=%d, programStrId=(%d,%d), " + "pathStrId=(%d,%d), ldPathStrId=(%d,%d), " + "name=%s, strlen(name)=%d, " + "port=%s, strlen(port)=%d, " + "infile=%s, strlen(infile)=%d, " + "outfile=%s, strlen(outfile)=%d, " + "argc=%d, strlen(total argv)=%d, args=[%.*s]\n" + , method_name, __LINE__ + , size + , msg.u.clone.programStrId.nid + , msg.u.clone.programStrId.id + , msg.u.clone.pathStrId.nid + , msg.u.clone.pathStrId.id + , msg.u.clone.ldpathStrId.nid + , msg.u.clone.ldpathStrId.id + , &msg.u.clone.stringData + , nameLen + , &msg.u.clone.stringData+nameLen + , portLen + , &msg.u.clone.stringData+nameLen + , infileLen + , &msg.u.clone.stringData+nameLen+infileLen + , outfileLen + , msg.u.clone.argc + , argvLen + , argvLen + , &msg.u.clone.stringData+nameLen+infileLen+outfileLen); + } + + int error = SendToMon( "process-clone" + , &msg + , size + , process->GetParentNid() + , parentLNode->GetNode()->GetName()); + + TRACE_EXIT; + return error; +} + +int CPtpClient::ProcessExit( CProcess *process + , int targetNid + , const char *targetNodeName ) { - const char method_name[] = "CPtpClient::NewProcess"; + const char method_name[] = "CPtpClient::ProcessExit"; TRACE_ENTRY; - // printf("\nTRK NewProcess, receiveNode %d, hostname %s\n", receiveNode, hostName); + + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d - Sending InternalType_Exit request to %s, targetNid=%d" + ", process=%s (%d,%d:%d) is exiting\n" + , method_name, __LINE__ + , targetNodeName + , targetNid + , process->GetName() + , process->GetNid() + , process->GetPid() + , process->GetVerifier() ); + } + + struct internal_msg_def msg; + memset(&msg, 0, sizeof(msg)); + msg.type = InternalType_Exit; + msg.u.exit.nid = process->GetNid(); + msg.u.exit.pid = process->GetPid(); + msg.u.exit.verifier = process->GetVerifier(); + strcpy(msg.u.exit.name, process->GetName()); + msg.u.exit.abended = process->IsAbended(); + + int size = offsetof(struct internal_msg_def, u); + size += sizeof(msg.u.exit); + + if (trace_settings & TRACE_PROCESS_DETAIL) + { + trace_printf( "%s@%d - size_=%d, process %s (%d,%d:%d) " + "abended=%d\n" + , method_name, __LINE__ + , size + , msg.u.exit.name + , msg.u.exit.nid + , msg.u.exit.pid + , msg.u.exit.verifier + , msg.u.exit.abended ); + } + + int error = SendToMon("process-exit", &msg, size, targetNid, targetNodeName); + + TRACE_EXIT; + return error; +} + +int CPtpClient::ProcessInit( CProcess *process + , void *tag + , int result + , int parentNid ) +{ + const char method_name[] = "CPtpClient::ProcessInit"; + TRACE_ENTRY; + + CLNode *parentLNode = NULL; + if (process->GetParentNid() != -1) + { + parentLNode = Nodes->GetLNode( process->GetParentNid() ); + } + + if (parentLNode == NULL) + { + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d - Not Sending InternalType_Clone request to parentNid=%d\n" + ", process=%s (%d,%d:%d)\n" + , method_name, __LINE__ + , process->GetParentNid() + , process->GetName() + , process->GetNid() + , process->GetPid() + , process->GetVerifier() ); + } + return(0); + } + + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d" " - Sending InternalType_ProcessInit to parent node %s, parentNid=%d\n" + ", for process %s (%d,%d:%d)\n" + , method_name, __LINE__ + , parentLNode->GetNode()->GetName() + , parentNid + , process->GetName() + , process->GetNid() + , process->GetPid() + , process->GetVerifier() ); + } + + struct internal_msg_def msg; + memset(&msg, 0, sizeof(msg)); + msg.type = InternalType_ProcessInit; + msg.u.processInit.nid = process->GetNid(); + msg.u.processInit.pid = process->GetPid(); + msg.u.processInit.verifier = process->GetVerifier(); + strcpy (msg.u.processInit.name, process->GetName()); + msg.u.processInit.state = process->GetState(); + msg.u.processInit.result = result; + msg.u.processInit.tag = tag; + msg.u.processInit.origNid = process->GetParentNid(); + + int size = offsetof(struct internal_msg_def, u); + size += sizeof(msg.u.processInit); + + int error = SendToMon( "process-init" + , &msg + , size + , parentNid + , parentLNode->GetNode()->GetName() ); + + TRACE_EXIT; + return error; + +} + +int CPtpClient::ProcessKill( CProcess *process + , bool abort + , int targetNid + , const char *targetNodeName ) +{ + const char method_name[] = "CPtpClient::ProcessKill"; + TRACE_ENTRY; + + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d - Sending InternalType_Kill request to %s, targetNid=%d" + ", killing process (%d,%d:%d)\n" + , method_name, __LINE__ + , targetNodeName + , targetNid + , process->GetNid() + , process->GetPid() + , process->GetVerifier() ); + } + + struct internal_msg_def msg; + memset(&msg, 0, sizeof(msg)); + msg.type = InternalType_Kill; + msg.u.kill.nid = process->GetNid(); + msg.u.kill.pid = process->GetPid(); + msg.u.kill.verifier = process->GetVerifier(); + msg.u.kill.persistent_abort = abort; + + int size = offsetof(struct internal_msg_def, u); + size += sizeof(msg.u.exit); + + if (trace_settings & TRACE_PROCESS_DETAIL) + { + trace_printf( "%s@%d - size_=%d, process (%d,%d:%d) " + "persistent_abort=%d\n" + , method_name, __LINE__ + , size + , msg.u.kill.nid + , msg.u.kill.pid + , msg.u.kill.verifier + , msg.u.kill.persistent_abort ); + } + + int error = SendToMon("process-kill", &msg, size, targetNid, targetNodeName); + + TRACE_EXIT; + return error; +} + +int CPtpClient::ProcessNew( CProcess *process + , int targetNid + , const char *targetNodeName ) +{ + const char method_name[] = "CPtpClient::ProcessNew"; + TRACE_ENTRY; + + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d - Sending InternalType_Process request to %s, targetNid=%d" + ", program=%s, parent=(%d,%d:%d)\n" + , method_name, __LINE__ + , targetNodeName + , targetNid + , process->program() + , process->GetParentNid() + , process->GetParentPid() + , process->GetParentVerifier() ); + } struct internal_msg_def msg; memset(&msg, 0, sizeof(msg)); @@ -322,42 +514,155 @@ int CPtpClient::NewProcess(CProcess* process, int receiveNode, const char *hostN msg.u.process.programStrId = process->programStrId(); msg.u.process.argc = process->argc(); + char *stringData = & msg.u.process.stringData; + int nameLen = strlen(process->GetName()) + 1; + int infileLen = strlen(process->infile()) + 1; + int outfileLen = strlen(process->outfile()) + 1; + int argvLen = process->userArgvLen(); + + // Copy the process name + msg.u.process.nameLen = nameLen; + memcpy( stringData, process->GetName(), nameLen ); + stringData += nameLen; + + // Copy the standard in file name + msg.u.process.infileLen = infileLen; + memcpy( stringData, process->infile(), infileLen ); + stringData += infileLen; + + // Copy the standard out file name + msg.u.process.outfileLen = outfileLen; + memcpy( stringData, process->outfile(), outfileLen ); + stringData += outfileLen; + + // Copy the program argument strings + msg.u.process.argvLen = argvLen; + memcpy( stringData, process->userArgv(), argvLen ); + int size = offsetof(struct internal_msg_def, u); size += sizeof(msg.u.process); + size += nameLen ; + size += infileLen ; + size += outfileLen ; + size += argvLen ; - int error = SendToMon("new-process", &msg, size, receiveNode, hostName); + if (trace_settings & TRACE_PROCESS_DETAIL) + { + trace_printf( "%s@%d - size_=%d, programStrId=(%d,%d), " + "pathStrId=(%d,%d), ldPathStrId=(%d,%d), " + "name=%s, strlen(name)=%d, " + "infile=%s, strlen(infile)=%d, " + "outfile=%s, strlen(outfile)=%d, " + "argc=%d, strlen(total argv)=%d, args=[%.*s]\n" + , method_name, __LINE__ + , size + , msg.u.process.programStrId.nid + , msg.u.process.programStrId.id + , msg.u.process.pathStrId.nid + , msg.u.process.pathStrId.id + , msg.u.process.ldpathStrId.nid + , msg.u.process.ldpathStrId.id + , &msg.u.process.stringData + , nameLen + , &msg.u.process.stringData+nameLen + , infileLen + , &msg.u.process.stringData+nameLen+infileLen + , outfileLen + , msg.u.process.argc + , argvLen + , argvLen + , &msg.u.process.stringData+nameLen+infileLen+outfileLen); + } + + int error = SendToMon("process-new", &msg, size, targetNid, targetNodeName); TRACE_EXIT; return error; } -int CPtpClient::ProcessInit(CProcess *process, void *tag, int result, int receiveNode, const char *hostName) +int CPtpClient::ProcessNotify( int nid + , int pid + , Verifier_t verifier + , _TM_Txid_External transId + , bool canceled + , CProcess *targetProcess + , int targetNid + , const char *targetNodeName ) { - const char method_name[] = "CPtpClient::ProcessInit"; - TRACE_ENTRY; - - // printf("\nTRK ProcessInit, receiveNOde %d, hostname %s\n", receiveNode, hostName); + const char method_name[] = "CPtpClient::ProcessNotify"; + TRACE_ENTRY; + + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d - Sending InternalType_Notify request to %s" + ", nid=%d, canceled=%d\n" + , method_name, __LINE__ + , targetNodeName + , targetNid + , canceled ); + } + struct internal_msg_def msg; memset(&msg, 0, sizeof(msg)); - msg.type = InternalType_ProcessInit; - msg.u.processInit.nid = process->GetNid(); - msg.u.processInit.pid = process->GetPid(); - msg.u.processInit.verifier = process->GetVerifier(); - strcpy (msg.u.processInit.name, process->GetName()); - msg.u.processInit.state = process->GetState(); - msg.u.processInit.result = result; - msg.u.processInit.tag = tag; - msg.u.processInit.origNid = receiveNode; - + msg.type = InternalType_Notify; + msg.u.notify.nid = nid; + msg.u.notify.pid = pid; + msg.u.notify.verifier = verifier; + msg.u.notify.canceled = canceled; + msg.u.notify.target_nid = targetProcess->GetNid(); + msg.u.notify.target_pid = targetProcess->GetPid(); + msg.u.notify.target_verifier = targetProcess->GetVerifier(); + msg.u.notify.trans_id = transId; + + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + if (canceled) + { + trace_printf( "%s@%d - Process (%d, %d:%d) deleting death " + "notice interest for %s (%d, %d:%d), " + "trans_id=%lld.%lld.%lld.%lld\n" + , method_name, __LINE__ + , msg.u.notify.nid + , msg.u.notify.pid + , msg.u.notify.verifier + , targetProcess->GetName() + , msg.u.notify.target_nid + , msg.u.notify.target_pid + , msg.u.notify.target_verifier + , msg.u.notify.trans_id.txid[0] + , msg.u.notify.trans_id.txid[1] + , msg.u.notify.trans_id.txid[2] + , msg.u.notify.trans_id.txid[3] ); + } + else + { + trace_printf("%s@%d - Process (%d, %d:%d) registering interest " + "in death of process %s (%d, %d:%d), " + "trans_id=%lld.%lld.%lld.%lld\n" + , method_name, __LINE__ + , msg.u.notify.nid + , msg.u.notify.pid + , msg.u.notify.verifier + , targetProcess->GetName() + , msg.u.notify.target_nid + , msg.u.notify.target_pid + , msg.u.notify.target_verifier + , msg.u.notify.trans_id.txid[0] + , msg.u.notify.trans_id.txid[1] + , msg.u.notify.trans_id.txid[2] + , msg.u.notify.trans_id.txid[3] ); + } + } + int size = offsetof(struct internal_msg_def, u); - size += sizeof(msg.u.processInit); - - int error = SendToMon("process-init", &msg, size, receiveNode, hostName); + size += sizeof(msg.u.notify); + + int error = SendToMon("process-notify", &msg, size, targetNid, targetNodeName); TRACE_EXIT; return error; - } + int CPtpClient::ReceiveSock(char *buf, int size, int sockFd) { const char method_name[] = "CPtpClient::ReceiveSock"; @@ -376,7 +681,7 @@ int CPtpClient::ReceiveSock(char *buf, int size, int sockFd) , sizeCount , 0 ); if ( readCount > 0 ) Meas.addSockPtpRcvdBytes( readCount ); - + if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) { trace_printf( "%s@%d - Count read %d = recv(%d)\n" @@ -448,7 +753,7 @@ int CPtpClient::SendSock(char *buf, int size, int sockFd) , size , 0 ); if ( sendCount > 0 ) Meas.addSockPtpSentBytes( sendCount ); - + if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) { trace_printf( "%s@%d - send(), sendCount=%d\n" @@ -511,18 +816,35 @@ int CPtpClient::SendToMon(const char *reqType, internal_msg_def *msg, int size, tempPort += receiveNode; } - memset(&mon2monPort, 0, MAX_PROCESSOR_NAME); - memset(&mon2monPortBase_, 0, MAX_PROCESSOR_NAME+100); + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d - reqType=%s, hostName=%s, receiveNode=%d, " + "tempPort=%d, basePort_=%d\n" + , method_name, __LINE__ + , reqType + , hostName + , receiveNode + , tempPort + , basePort_ ); + } - strcat (mon2monPortBase_, hostName); - strcat(mon2monPortBase_, ":"); - sprintf(monPortString,"%d", tempPort); - strcat (mon2monPort, mon2monPortBase_); - strcat (mon2monPort, monPortString); + memset( &mon2monPort, 0, MAX_PROCESSOR_NAME ); + memset( &mon2monPortBase_, 0, MAX_PROCESSOR_NAME+100 ); - InitializePtpClient(mon2monPort); - - if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + strcat( mon2monPortBase_, hostName ); + strcat( mon2monPortBase_, ":" ); + sprintf( monPortString,"%d", tempPort ); + strcat( mon2monPort, mon2monPortBase_ ); + strcat( mon2monPort, monPortString ); + + int error = InitializePtpClient( mon2monPort ); + if (error < 0) + { + TRACE_EXIT; + return error; + } + + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) { trace_printf( "%s@%d - sending %s REQ to Monitor=%s, sock=%d\n" , method_name, __LINE__ @@ -531,10 +853,10 @@ int CPtpClient::SendToMon(const char *reqType, internal_msg_def *msg, int size, , mon2monSock_); } - int error = SendSock((char *) &size, sizeof(size), mon2monSock_); + error = SendSock((char *) &size, sizeof(size), mon2monSock_); if (error) { - if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) { trace_printf( "%s@%d - error sending to Monitor=%s, sock=%d, error=%d\n" , method_name, __LINE__ @@ -547,7 +869,7 @@ int CPtpClient::SendToMon(const char *reqType, internal_msg_def *msg, int size, error = SendSock((char *) msg, size, mon2monSock_); if (error) { - if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) { trace_printf( "%s@%d - error sending to nameserver=%s, sock=%d, error=%d\n" , method_name, __LINE__ @@ -558,6 +880,7 @@ int CPtpClient::SendToMon(const char *reqType, internal_msg_def *msg, int size, } close( mon2monSock_ ); + TRACE_EXIT; return error; } http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/monitor/linux/ptpclient.h ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/ptpclient.h b/core/sqf/monitor/linux/ptpclient.h index 73dafd1..7dd8b86 100644 --- a/core/sqf/monitor/linux/ptpclient.h +++ b/core/sqf/monitor/linux/ptpclient.h @@ -23,8 +23,8 @@ // /////////////////////////////////////////////////////////////////////////////// -#ifndef PTPSERVER_H_ -#define PTPSERVER_H_ +#ifndef PTPCLIENT_H_ +#define PTPCLIENT_H_ #ifndef NAMESERVER_PROCESS #include "process.h" @@ -41,26 +41,40 @@ public: virtual ~CPtpClient( void ); int InitializePtpClient( char * mon2monPort ); - int NewProcess(CProcess* proces, int receiveNode, const char *hostName); - int ProcessInit(CProcess *process, void *tag, int result, int receiveNode, const char *hostName); -/* //TRK-TODO - Need methods for these message types: - InternalType_Clone - InternalType_Open - InternalType_Notify - InternalType_Exit -*/ + int ProcessClone( CProcess *process ); + int ProcessExit( CProcess* process + , int parentNid + , const char *targetNodeName ); + int ProcessInit( CProcess *process + , void *tag + , int result + , int parentNid ); + int ProcessKill( CProcess* process + , bool abort + , int targetNid + , const char *targetNodeName ); + int ProcessNew( CProcess* process + , int targetNid + , const char *targetNodeName ); + int ProcessNotify( int nid + , int pid + , Verifier_t verifier + , _TM_Txid_External transId + , bool canceled + , CProcess *targetProcess + , int targetNid + , const char *targetNodeName ); + private: int basePort_; char mon2monPortBase_[MAX_PROCESSOR_NAME+100]; int mon2monSock_; int seqNum_; - int MkCltSock( const char *portName ); int ReceiveSock(char *buf, int size, int sockFd); int SendSock(char *buf, int size, int sockFd); int SendToMon(const char *reqType, internal_msg_def *msg, int size, int receiveNode, const char *hostName); }; #endif -#endif /*PTPSERVER_H_*/ +#endif /*PTPCLIENT_H_*/ http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/monitor/linux/ptpcommaccept.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/ptpcommaccept.cxx b/core/sqf/monitor/linux/ptpcommaccept.cxx index 8829f28..b070508 100644 --- a/core/sqf/monitor/linux/ptpcommaccept.cxx +++ b/core/sqf/monitor/linux/ptpcommaccept.cxx @@ -64,159 +64,21 @@ CPtpCommAccept::~CPtpCommAccept() TRACE_EXIT; } -// REMOVE -bool CPtpCommAccept::sendNodeInfoSock( int sockFd ) -{ - const char method_name[] = "CPtpCommAccept::sendNodeInfoSock"; - TRACE_ENTRY; - bool sentData = true; - - int pnodeCount = Nodes->GetPNodesCount(); - - nodeId_t *nodeInfo; - size_t nodeInfoSize = (sizeof(nodeId_t) * pnodeCount); - nodeInfo = (nodeId_t *) new char[nodeInfoSize]; - int rc; - - CNode *node; - - for (int i=0; i<pnodeCount; ++i) - { - node = Nodes->GetNodeByMap( i ); - if ( node->GetState() == State_Up) - { - strncpy(nodeInfo[i].nodeName, node->GetName(), - sizeof(nodeInfo[i].nodeName)); - strncpy(nodeInfo[i].commPort, node->GetCommPort(), - sizeof(nodeInfo[i].commPort)); - strncpy(nodeInfo[i].syncPort, node->GetSyncPort(), - sizeof(nodeInfo[i].syncPort)); - nodeInfo[i].pnid = node->GetPNid(); - nodeInfo[i].creatorPNid = (nodeInfo[i].pnid == MyPNID) ? MyPNID : -1; - - if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) - { - trace_printf( "%s@%d - Node info for pnid=%d (%s)\n" - " CommPort=%s\n" - " SyncPort=%s\n" - " creatorPNid=%d\n" - , method_name, __LINE__ - , nodeInfo[i].pnid - , nodeInfo[i].nodeName - , nodeInfo[i].commPort - , nodeInfo[i].syncPort - , nodeInfo[i].creatorPNid ); - } - } - else - { - if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) - { - trace_printf( "%s@%d - No nodeInfo[%d] for pnid=%d (%s) node not up!\n" - , method_name, __LINE__ - , i, node->GetPNid(), node->GetName()); - } - - nodeInfo[i].pnid = -1; - nodeInfo[i].nodeName[0] = '\0'; - nodeInfo[i].commPort[0] = '\0'; - nodeInfo[i].syncPort[0] = '\0'; - nodeInfo[i].creatorPNid = -1; - } - } - - if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) - { - trace_printf( "%s@%d - Sending port info to new monitor\n" - , method_name, __LINE__); - for (int i=0; i<pnodeCount; i++) - { - trace_printf( "Port info for pnid=%d\n" - " nodeInfo[%d].nodeName=%s\n" - " nodeInfo[%d].commPort=%s\n" - " nodeInfo[%d].syncPort=%s\n" - " nodeInfo[%d].creatorPNid=%d\n" - , nodeInfo[i].pnid - , i, nodeInfo[i].nodeName - , i, nodeInfo[i].commPort - , i, nodeInfo[i].syncPort - , i, nodeInfo[i].creatorPNid ); - } - } - - rc = Monitor->SendSock( (char *) nodeInfo - , nodeInfoSize - , sockFd - , method_name ); - if ( rc ) - { - char buf[MON_STRING_BUF_SIZE]; - snprintf(buf, sizeof(buf), "[%s], cannot send node/port info to " - " new monitor process: %s.\n" - , method_name, ErrorMsg(rc)); - mon_log_write(MON_COMMACCEPT_2, SQ_LOG_ERR, buf); - - sentData = false; - } - - delete [] nodeInfo; - - TRACE_EXIT; - return sentData; -} - -void CPtpCommAccept::processNewSock( int joinFd ) +void CPtpCommAccept::processNewSock( int sockFd ) { const char method_name[] = "CPtpCommAccept::processNewSock"; TRACE_ENTRY; struct internal_msg_def msg; - nodeId_t nodeId; int rc; - // Get info about connecting monitor - rc = Monitor->ReceiveSock( (char *) &nodeId - , sizeof(nodeId_t) - , joinFd - , method_name ); - if ( rc ) - { // Handle error - close( joinFd ); - char buf[MON_STRING_BUF_SIZE]; - snprintf(buf, sizeof(buf), "[%s], unable to obtain node id from new " - "monitor: %s.\n", method_name, ErrorMsg(rc)); - mon_log_write(MON_COMMACCEPT_8, SQ_LOG_ERR, buf); - return; - } - -/* - printf( "TRK %s@%d - Accepted connection from pnid=%d\n" - " nodeId.nodeName=%s\n" - " nodeId.commPort=%s\n" - " nodeId.syncPort=%s\n" - " nodeId.creatorPNid=%d\n" - " nodeId.creator=%d\n" - " nodeId.creatorShellPid=%d\n" - " nodeId.creatorShellVerifier=%d\n" - " nodeId.ping=%d\n" - , method_name, __LINE__ - , nodeId.pnid - , nodeId.nodeName - , nodeId.commPort - , nodeId.syncPort - , nodeId.creatorPNid - , nodeId.creator - , nodeId.creatorShellPid - , nodeId.creatorShellVerifier - , nodeId.ping ); - */ mem_log_write(CMonLog::MON_CONNTONEWMON_2); int size; - rc = Monitor->ReceiveSock( (char *) &size, sizeof(size), joinFd, method_name ); + rc = Monitor->ReceiveSock( (char *) &size, sizeof(size), sockFd, method_name ); if ( rc ) { // Handle error - close( joinFd ); + close( sockFd ); char buf[MON_STRING_BUF_SIZE]; snprintf(buf, sizeof(buf), "[%s], unable to obtain node id from new " "monitor: %s.\n", method_name, ErrorMsg(rc)); @@ -224,15 +86,14 @@ void CPtpCommAccept::processNewSock( int joinFd ) return; } // Get info about connecting monitor - rc = Monitor->ReceiveSock( /*(char *) &nodeId*/ - (char *) &msg + rc = Monitor->ReceiveSock( (char *) &msg , size - , joinFd + , sockFd , method_name ); if ( rc ) { // Handle error - close( joinFd ); + close( sockFd ); char buf[MON_STRING_BUF_SIZE]; snprintf(buf, sizeof(buf), "[%s], unable to obtain node id from new " "monitor: %s.\n", method_name, ErrorMsg(rc)); @@ -245,41 +106,84 @@ void CPtpCommAccept::processNewSock( int joinFd ) { case InternalType_Process: { - /* printf("\nTRK Received ReqType_NewProcess \n"); - ReqQueue.enqueueNewProcReq( &msg.u.process); - */ - break; + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d" " - Received InternalType_Process\n" + , method_name, __LINE__ ); + } + ReqQueue.enqueueNewProcReq( &msg.u.process); + break; } case InternalType_ProcessInit: { - /* printf("\nTRK Received InternalType_ProcessInit \n"); - ReqQueue.enqueueProcInitReq( &msg.u.processInit); - */ - break; + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d" " - Received InternalType_ProcessInit\n" + , method_name, __LINE__ ); + } + if ( MyNode->IsMyNode(msg.u.processInit.origNid) ) + { // New process request originated on this node + ReqQueue.enqueueProcInitReq( &msg.u.processInit); + } + else + { + abort(); + } + break; } - //TRK-TODO case InternalType_Clone: { + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d" " - Received InternalType_Clone\n" + , method_name, __LINE__ ); + } + ReqQueue.enqueueCloneReq( &msg.u.clone ); break; } - //TRK-TODO case InternalType_Open: { + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d" " - Received InternalType_Open\n" + , method_name, __LINE__ ); + } + ReqQueue.enqueueOpenReq( &msg.u.open ); break; } - //TRK-TODO case InternalType_Notify: { + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d" " - Received InternalType_Notify\n" + , method_name, __LINE__ ); + } + ReqQueue.enqueueNotifyReq( &msg.u.notify ); break; } - //TRK-TODO case InternalType_Exit: { + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d" " - Received InternalType_Exit\n" + , method_name, __LINE__ ); + } + ReqQueue.enqueueExitReq( &msg.u.exit ); + break; + } + case InternalType_Kill: + { + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d" " - Received InternalType_Kill\n" + , method_name, __LINE__ ); + } + ReqQueue.enqueueKillReq( &msg.u.kill ); break; } default: { - + abort(); } } } @@ -313,7 +217,7 @@ void CPtpCommAccept::commAcceptorSock() const char method_name[] = "CPtpCommAccept::commAcceptorSock"; TRACE_ENTRY; - int joinFd = -1; + int sockFd = -1; if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) { @@ -331,7 +235,7 @@ void CPtpCommAccept::commAcceptorSock() } mem_log_write(CMonLog::MON_CONNTONEWMON_1); - joinFd = Monitor->AcceptMon2MonSock(); + sockFd = Monitor->AcceptMon2MonSock(); } else { @@ -354,7 +258,7 @@ void CPtpCommAccept::commAcceptorSock() break; } - if ( joinFd < 0 ) + if ( sockFd < 0 ) { char buf[MON_STRING_BUF_SIZE]; snprintf(buf, sizeof(buf), "[%s], cannot accept new monitor: %s.\n", @@ -363,11 +267,11 @@ void CPtpCommAccept::commAcceptorSock() } else { - processNewSock( joinFd ); + processNewSock( sockFd ); } } - if ( !(joinFd < 0) ) close( joinFd ); + if ( !(sockFd < 0) ) close( sockFd ); if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) trace_printf("%s@%d thread %lx exiting\n", method_name, @@ -504,38 +408,3 @@ void CPtpCommAccept::monReqExec( void *req ) TRACE_EXIT; } - -//TRK-TODO -void CPtpCommAccept::monReqNewProcess( struct message_def* msg, int sockFd ) -{ - const char method_name[] = "CCommAcceptMon::monReqNewProcess"; - TRACE_ENTRY; - sockFd = sockFd; // appease compiler for the time being - - if (trace_settings & (TRACE_REQUEST)) - { - trace_printf( "%s@%d - Received monitor request new-process data.\n" - " msg.new_process_ns.parent_nid=%d\n" - " msg.new_process_ns.parent_pid=%d\n" - " msg.new_process_ns.parent_verifier=%d\n" - " msg.new_process_ns.nid=%d\n" - " msg._nsnew_process.pid=%d\n" - " msg._nsnew_process.verifier=%d\n" - " msg.new_process_ns.type=%d\n" - " msg.new_process_ns.priority=%d\n" - " msg.new_process_ns.process_name=%s\n" - , method_name, __LINE__ - , msg->u.request.u.new_process_ns.parent_nid - , msg->u.request.u.new_process_ns.parent_pid - , msg->u.request.u.new_process_ns.parent_verifier - , msg->u.request.u.new_process_ns.nid - , msg->u.request.u.new_process_ns.pid - , msg->u.request.u.new_process_ns.verifier - , msg->u.request.u.new_process_ns.type - , msg->u.request.u.new_process_ns.priority - , msg->u.request.u.new_process_ns.process_name - ); - } - - TRACE_EXIT; -} http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/monitor/linux/ptpcommaccept.h ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/ptpcommaccept.h b/core/sqf/monitor/linux/ptpcommaccept.h index 617d6d7..ca58139 100644 --- a/core/sqf/monitor/linux/ptpcommaccept.h +++ b/core/sqf/monitor/linux/ptpcommaccept.h @@ -47,11 +47,9 @@ public: void start( void ); void shutdownWork( void ); - void monReqNewProcess( struct message_def* msg, int sockFd ); private: void commAcceptorSock( void ); - bool sendNodeInfoSock( int sockFd ); bool accepting_; bool shutdown_; http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/monitor/linux/replicate.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/replicate.cxx b/core/sqf/monitor/linux/replicate.cxx index 3cd23ec..c22c799 100644 --- a/core/sqf/monitor/linux/replicate.cxx +++ b/core/sqf/monitor/linux/replicate.cxx @@ -701,7 +701,7 @@ bool CReplProcInit::replicate(struct internal_msg_def *&msg) trace_printf("%s@%d - Replicating proc init new process %s, result=%d\n", method_name, __LINE__, name_, result_); - // Build message to replicate process initializationdata other nodes + // Build message to replicate process initialization data other nodes msg->type = InternalType_ProcessInit; msg->u.processInit.nid = nid_; msg->u.processInit.pid = pid_; @@ -739,9 +739,9 @@ CReplClone::CReplClone(CProcess *process) : process_(process) if (trace_settings & (TRACE_SYNC_DETAIL | TRACE_PROCESS_DETAIL)) { const char method_name[] = "CReplClone::CReplClone"; - trace_printf("%s@%d - Queuing replicate process %s (%d, %d:%d)\n", + trace_printf("%s@%d - Queuing replicate process %s (%d, %d:%d), port=%s\n", method_name, __LINE__, process_->GetName(), process_->GetNid(), - process_->GetPid(), process_->GetVerifier()); + process_->GetPid(), process_->GetVerifier(), process_->GetPort()); } // Increment reference count for process object @@ -834,8 +834,28 @@ bool CReplClone::replicate(struct internal_msg_def *&msg) // temp trace if (trace_settings & TRACE_PROCESS) { - trace_printf("%s@%d - replSize_=%d, programStrId=(%d,%d), pathStrId=(%d,%d), ldPathStrId=(%d,%d), name=%s, strlen(name)=%d, port=%s, strlen(port)=%d, infile=%s, strlen(infile)=%d, outfile=%s, strlen(outfile)=%d, argc=%d, strlen(total argv)=%d, args=[%.*s]\n", - method_name, __LINE__, replSize_, msg->u.clone.programStrId.nid, msg->u.clone.programStrId.id, msg->u.clone.pathStrId.nid, msg->u.clone.pathStrId.id, msg->u.clone.ldpathStrId.nid, msg->u.clone.ldpathStrId.id, &msg->u.clone.stringData, nameLen_, &msg->u.clone.stringData+nameLen_, portLen_, &msg->u.clone.stringData+nameLen_+portLen_, infileLen_, &msg->u.clone.stringData+nameLen_+portLen_+infileLen_, outfileLen_, msg->u.clone.argc, argvLen_, argvLen_, &msg->u.clone.stringData+nameLen_+portLen_+infileLen_+outfileLen_); + trace_printf( "%s@%d - replSize_=%d\n" + " msg->u.clone.programStrId=(%d,%d)\n" + " msg->u.clone.pathStrId=(%d,%d)\n" + " msg->u.clone.ldPathStrId=(%d,%d)\n" + " msg->u.clone.name=%s, strlen(name)=%d\n" + " msg->u.clone.port=%s, strlen(port)=%d\n" + " msg->u.clone.infile=%s, strlen(infile)=%d\n" + " msg->u.clone.outfile=%s, strlen(outfile)=%d\n" + " msg->u.clone.argc=%d, strlen(total argv)=%d, args=[%.*s]\n" + , method_name, __LINE__, replSize_ + , msg->u.clone.programStrId.nid + , msg->u.clone.programStrId.id + , msg->u.clone.pathStrId.nid + , msg->u.clone.pathStrId.id + , msg->u.clone.ldpathStrId.nid + , msg->u.clone.ldpathStrId.id + , &msg->u.clone.stringData, nameLen_ + , &msg->u.clone.stringData+nameLen_, portLen_ + , &msg->u.clone.stringData+nameLen_+portLen_, infileLen_ + , &msg->u.clone.stringData+nameLen_+portLen_+infileLen_, outfileLen_ + , msg->u.clone.argc + , argvLen_, argvLen_, &msg->u.clone.stringData+nameLen_+portLen_+infileLen_+outfileLen_); } // Advance sync buffer pointer http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/monitor/linux/reqkill.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/reqkill.cxx b/core/sqf/monitor/linux/reqkill.cxx index 40580c4..ee88a19 100644 --- a/core/sqf/monitor/linux/reqkill.cxx +++ b/core/sqf/monitor/linux/reqkill.cxx @@ -30,6 +30,7 @@ #include "monlogging.h" #include "replicate.h" #include "mlio.h" +#include "ptpclient.h" extern CMonitor *Monitor; extern CMonStats *MonStats; @@ -37,6 +38,8 @@ extern CNodeContainer *Nodes; extern CReplicate Replicator; extern CNode *MyNode; extern int MyPNID; +extern CPtpClient *PtpClient; +extern bool NameServerEnabled; CExtKillReq::CExtKillReq (reqQueueMsg_t msgType, int pid, struct message_def *msg ) @@ -88,12 +91,23 @@ void CExtKillReq::Kill( CProcess *process ) if ( (node->GetState() == State_Up || node->GetState() == State_Shutdown) && node->GetPNid() != MyPNID ) { - // Replicate the kill to other nodes - CReplKill *repl = new CReplKill( process->GetNid() - , process->GetPid() - , process->GetVerifier() - , process->GetAbort()); - Replicator.addItem(repl); + if (NameServerEnabled) + { + // Forward the process create to the target node + PtpClient->ProcessKill( process + , process->GetAbort() + , lnode->GetNid() + , node->GetName()); + } + else + { + // Replicate the kill to other nodes + CReplKill *repl = new CReplKill( process->GetNid() + , process->GetPid() + , process->GetVerifier() + , process->GetAbort()); + Replicator.addItem(repl); + } } else { http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/monitor/linux/reqnewproc.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/reqnewproc.cxx b/core/sqf/monitor/linux/reqnewproc.cxx index 04f1f26..5ebfec5 100644 --- a/core/sqf/monitor/linux/reqnewproc.cxx +++ b/core/sqf/monitor/linux/reqnewproc.cxx @@ -527,15 +527,18 @@ void CExtNewProcReq::performRequest() { process->userArgs ( msg_->u.request.u.new_process.argc, msg_->u.request.u.new_process.argv ); - // Replicate the process to other nodes - -//TRK-TODO - /* if (NameServerEnabled) +#ifndef NAMESERVER_PROCESS + if (NameServerEnabled) { - PtpClient->NewProcess(process, lnode->GetNid(), lnode->GetNode()->GetName()); + // Forward the process create to the target node + PtpClient->ProcessNew( process + , lnode->GetNid() + , lnode->GetNode()->GetName()); } else - */ { +#endif + { + // Replicate the process to other nodes CReplProcess *repl = new CReplProcess(process); Replicator.addItem(repl); } @@ -558,19 +561,6 @@ void CExtNewProcReq::performRequest() strcpy(msg_->u.reply.u.new_process.process_name,process->GetName()); msg_->u.reply.u.new_process.return_code = MPI_SUCCESS; } -#ifdef QUICK_WAITED_NEWPROCESS_REPLY - else if (process->GetPid() != -1) - { // Process was created locally, reply now. The process - // was created but the process startup message has not yet - // arrived. - msg_->u.reply.type = ReplyType_NewProcess; - msg_->u.reply.u.new_process.nid = process->GetNid(); - msg_->u.reply.u.new_process.pid = process->GetPid(); - msg_->u.reply.u.new_process.verifier = process->GetVerifier(); - strcpy(msg_->u.reply.u.new_process.process_name,process->GetName()); - msg_->u.reply.u.new_process.return_code = MPI_SUCCESS; - } -#endif else { // we will not reply at this time ... but wait for the child process to
