http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/ptpclient.h ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/ptpclient.h b/core/sqf/monitor/linux/ptpclient.h index 87f2315..e6ddeb4 100644 --- a/core/sqf/monitor/linux/ptpclient.h +++ b/core/sqf/monitor/linux/ptpclient.h @@ -69,6 +69,16 @@ public: , CProcess *targetProcess , int targetNid , const char *targetNodeName ); + int StdInReq( int nid + , int pid + , StdinReqType type + , int supplierNid + , int supplierPid ); + int StdIoData( int nid + , int pid + , StdIoType type + , ssize_t count + , char *data ); private:
http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/ptpcommaccept.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/ptpcommaccept.cxx b/core/sqf/monitor/linux/ptpcommaccept.cxx index fa21dc0..d380d3a 100644 --- a/core/sqf/monitor/linux/ptpcommaccept.cxx +++ b/core/sqf/monitor/linux/ptpcommaccept.cxx @@ -23,9 +23,10 @@ // /////////////////////////////////////////////////////////////////////////////// - using namespace std; +#include <stdio.h> +#include "redirector.h" #include "ptpcommaccept.h" #include "monlogging.h" #include "montrace.h" @@ -33,11 +34,13 @@ using namespace std; #include "reqqueue.h" +extern CRedirector Redirector; extern CReqQueue ReqQueue; extern CPtpCommAccept PtpCommAccept; extern CMonitor *Monitor; extern CNode *MyNode; extern CNodeContainer *Nodes; +extern CRedirector Redirector; extern int MyPNID; extern char MyPtPPort[MPI_MAX_PORT_NAME]; extern char *ErrorMsg (int error_code); @@ -46,9 +49,9 @@ extern CommType_t CommType; CPtpCommAccept::CPtpCommAccept() - : accepting_(true) - , shutdown_(false) - , thread_id_(0) + : accepting_(true) + , shutdown_(false) + , thread_id_(0) { const char method_name[] = "CPtpCommAccept::CPtpCommAccept"; TRACE_ENTRY; @@ -82,7 +85,7 @@ void CPtpCommAccept::processNewSock( int sockFd ) char buf[MON_STRING_BUF_SIZE]; snprintf(buf, sizeof(buf), "[%s], unable to obtain node id from new " "monitor: %s.\n", method_name, ErrorMsg(rc)); - mon_log_write(MON_COMMACCEPT_8, SQ_LOG_ERR, buf); + mon_log_write(PTP_COMMACCEPT_1, SQ_LOG_ERR, buf); return; } // Get info about connecting monitor @@ -97,7 +100,7 @@ void CPtpCommAccept::processNewSock( int sockFd ) char buf[MON_STRING_BUF_SIZE]; snprintf(buf, sizeof(buf), "[%s], unable to obtain node id from new " "monitor: %s.\n", method_name, ErrorMsg(rc)); - mon_log_write(MON_COMMACCEPT_8, SQ_LOG_ERR, buf); + mon_log_write(PTP_COMMACCEPT_2, SQ_LOG_ERR, buf); return; } else @@ -191,6 +194,26 @@ void CPtpCommAccept::processNewSock( int sockFd ) ReqQueue.enqueueKillReq( &msg.u.kill ); break; } + case InternalType_IoData: + { + if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS)) + { + trace_printf( "%s@%d" " - Received InternalType_IoData\n" + , method_name, __LINE__ ); + } + ReqQueue.enqueueIoDataReq( &msg.u.iodata ); + break; + } + case InternalType_StdinReq: + { + if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS)) + { + trace_printf( "%s@%d" " - Received InternalType_StdinReq\n" + , method_name, __LINE__ ); + } + ReqQueue.enqueueStdInReq( &msg.u.stdin_req ); + break; + } default: { abort(); @@ -273,7 +296,7 @@ void CPtpCommAccept::commAcceptorSock() char buf[MON_STRING_BUF_SIZE]; snprintf(buf, sizeof(buf), "[%s], cannot accept new monitor: %s.\n", method_name, strerror(errno)); - mon_log_write(MON_COMMACCEPT_16, SQ_LOG_ERR, buf); + mon_log_write(PTP_COMMACCEPT_6, SQ_LOG_ERR, buf); } else { @@ -330,7 +353,7 @@ static void *ptpCommAccept(void *arg) char buf[MON_STRING_BUF_SIZE]; snprintf(buf, sizeof(buf), "[%s], pthread_sigmask error=%d\n", method_name, rc); - mon_log_write(MON_COMMACCEPT_17, SQ_LOG_ERR, buf); + mon_log_write(PTP_COMMACCEPT_7, SQ_LOG_ERR, buf); } // Enter thread processing loop @@ -353,7 +376,7 @@ void CPtpCommAccept::start() char buf[MON_STRING_BUF_SIZE]; snprintf(buf, sizeof(buf), "[%s], thread create error=%d\n", method_name, rc); - mon_log_write(MON_COMMACCEPT_18, SQ_LOG_ERR, buf); + mon_log_write(PTP_COMMACCEPT_8, SQ_LOG_ERR, buf); } TRACE_EXIT; http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/redirector.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/redirector.cxx b/core/sqf/monitor/linux/redirector.cxx index 54710bc..43bb231 100644 --- a/core/sqf/monitor/linux/redirector.cxx +++ b/core/sqf/monitor/linux/redirector.cxx @@ -58,6 +58,7 @@ using namespace std; #include "replicate.h" #include "monsonar.h" #include "reqqueue.h" +#include "ptpclient.h" #endif #ifndef NAMESERVER_PROCESS @@ -70,6 +71,8 @@ extern CNodeContainer *Nodes; extern CReplicate Replicator; extern CMonStats *MonStats; extern CReqQueue ReqQueue; +extern CPtpClient *PtpClient; +extern bool NameServerEnabled; #endif const char *EpollEventString( __uint32_t events ) @@ -662,10 +665,23 @@ int CRedirectAncestorStdin::handleInput() reqType = STDIN_FLOW_ON; } - CReplStdinReq *repl - = new CReplStdinReq(MyPNID, pid_, reqType, ancestorNid_, ancestorPid_ ); - Replicator.addItem(repl); - + if (NameServerEnabled) + { + PtpClient->StdInReq( MyPNID + , pid_ + , reqType + , ancestorNid_ + , ancestorPid_ ); + } + else + { + CReplStdinReq *repl = new CReplStdinReq( MyPNID + , pid_ + , reqType + , ancestorNid_ + , ancestorPid_ ); + Replicator.addItem(repl); + } } TRACE_EXIT; @@ -856,9 +872,24 @@ void CRedirectStdinRemote::handleOutput(ssize_t count, char *buffer) (int)count); } - CReplStdioData *repl - = new CReplStdioData(requesterNid_, pid_, STDIN_DATA, count, buffer ); - Replicator.addItem(repl); + if (NameServerEnabled) + { + PtpClient->StdIoData( requesterNid_ + , pid_ + , STDIN_DATA + , count + , buffer ); + } + else + { + CReplStdioData *repl = new CReplStdioData( requesterNid_ + , pid_ + , STDIN_DATA + , count + , buffer ); + Replicator.addItem(repl); + } + if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED)) MonStats->StdinRemoteDataReplIncr(); @@ -1144,10 +1175,24 @@ void CRedirectAncestorStdout::handleOutput(ssize_t count, char *buffer) (int)count); } - CReplStdioData *repl - = new CReplStdioData(ancestor_nid_, ancestor_pid_, STDOUT_DATA, - count, buffer ); - Replicator.addItem(repl); + if (NameServerEnabled) + { + PtpClient->StdIoData( ancestor_nid_ + , ancestor_pid_ + , STDOUT_DATA + , count + , buffer ); + } + else + { + CReplStdioData *repl = new CReplStdioData( ancestor_nid_ + , ancestor_pid_ + , STDOUT_DATA + , count + , buffer ); + Replicator.addItem(repl); + } + if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED)) MonStats->StdioDataReplIncr(); @@ -1607,10 +1652,23 @@ void CRedirector::stdinFd(int nid, int pid, int &pipeFd, char filename[], fdMap_.insert(std::make_pair(pipeFd, redirect)); fdMapLock_.unlock(); - CReplStdinReq *repl - = new CReplStdinReq(nid, pid, STDIN_REQ_DATA, ancestor_nid, - ancestor_pid ); - Replicator.addItem(repl); + if (NameServerEnabled) + { + PtpClient->StdInReq( nid + , pid + , STDIN_REQ_DATA + , ancestor_nid + , ancestor_pid ); + } + else + { + CReplStdinReq *repl = new CReplStdinReq( nid + , pid + , STDIN_REQ_DATA + , ancestor_nid + , ancestor_pid ); + Replicator.addItem(repl); + } } TRACE_EXIT; http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/replicate.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/replicate.cxx b/core/sqf/monitor/linux/replicate.cxx index 0b6fadb..f9ebf53 100644 --- a/core/sqf/monitor/linux/replicate.cxx +++ b/core/sqf/monitor/linux/replicate.cxx @@ -71,6 +71,9 @@ void CReplObj::validateObj() #ifdef NAMESERVER_PROCESS struct dummy_sizeof_def {}; #endif +#ifndef EXCHANGE_CPU_SCHEDULING_DATA +struct dummy1_sizeof_def {}; +#endif // Determine the maximum size of a replication object (excluding CReplEvent) int CReplObj::calcAllocSize() @@ -81,7 +84,11 @@ int CReplObj::calcAllocSize() sizeof(CReplNodeDelete)), sizeof(CReplSoftNodeUp)), sizeof(CReplSoftNodeDown)), +#ifdef EXCHANGE_CPU_SCHEDULING_DATA sizeof(CReplSchedData)), +#else + sizeof(dummy1_sizeof_def)), +#endif sizeof(CReplActivateSpare)), sizeof(CReplConfigData)), sizeof(CReplOpen)), @@ -1789,6 +1796,7 @@ bool CReplNodeName::replicate(struct internal_msg_def *&msg) return true; } +#ifdef EXCHANGE_CPU_SCHEDULING_DATA CReplSchedData::CReplSchedData() { // Add eyecatcher sequence as a debugging aid @@ -1865,6 +1873,7 @@ bool CReplSchedData::replicate(struct internal_msg_def *&msg) return true; } +#endif CReplNodeUp::CReplNodeUp(int pnid) : pnid_(pnid) http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/replicate.h ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/replicate.h b/core/sqf/monitor/linux/replicate.h index 5d4a7cc..3ae0909 100644 --- a/core/sqf/monitor/linux/replicate.h +++ b/core/sqf/monitor/linux/replicate.h @@ -424,6 +424,7 @@ private: }; +#ifdef EXCHANGE_CPU_SCHEDULING_DATA class CReplSchedData: public CReplObj { public: @@ -435,6 +436,7 @@ public: private: }; +#endif #ifndef NAMESERVER_PROCESS class CReplStdioData: public CReplObj http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/reqdump.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/reqdump.cxx b/core/sqf/monitor/linux/reqdump.cxx index 159219a..4e56bb5 100644 --- a/core/sqf/monitor/linux/reqdump.cxx +++ b/core/sqf/monitor/linux/reqdump.cxx @@ -75,7 +75,7 @@ void CExtDumpReq::performRequest() CProcess *target; CProcess *requester; - CLNode *node; + CLNode *lnode; string target_process_name; int target_nid = -1; int target_pid = -1; @@ -144,9 +144,10 @@ void CExtDumpReq::performRequest() method_name, __LINE__, target->GetName(), target->GetNid(), target->GetPid()); target->parentContext(msg_); - if (node->Dump_Process(requester, - target, - msg_->u.request.u.dump.path) != SUCCESS) + lnode = Nodes->GetLNode(target_nid); + if (lnode->Dump_Process(requester, + target, + msg_->u.request.u.dump.path) != SUCCESS) rc = MPI_ERR_SPAWN; } else http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/reqnewproc.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/reqnewproc.cxx b/core/sqf/monitor/linux/reqnewproc.cxx index b5f9ec1..a6da01e 100644 --- a/core/sqf/monitor/linux/reqnewproc.cxx +++ b/core/sqf/monitor/linux/reqnewproc.cxx @@ -461,6 +461,7 @@ void CExtNewProcReq::performRequest() programStrId, msg_->u.request.u.new_process.infile, msg_->u.request.u.new_process.outfile + , 0 // tag , result ); if ( process ) @@ -468,7 +469,7 @@ void CExtNewProcReq::performRequest() process->userArgs ( msg_->u.request.u.new_process.argc, msg_->u.request.u.new_process.argv ); } - if ( process && process->Create(process->GetParent(), result)) + if ( process && process->Create(process->GetParent(), 0, result)) { MyNode->AddToNameMap(process); MyNode->AddToPidMap(process->GetPid(), process); http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/reqprocinfo.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/reqprocinfo.cxx b/core/sqf/monitor/linux/reqprocinfo.cxx index 566a92b..84dc3a7 100644 --- a/core/sqf/monitor/linux/reqprocinfo.cxx +++ b/core/sqf/monitor/linux/reqprocinfo.cxx @@ -634,99 +634,118 @@ void CExtProcInfoContReq::performRequest() TRACE_ENTRY; #ifndef NAMESERVER_PROCESS - if ( NameServerEnabled ) + bool getMonitorInfo = false; + if (strcasecmp(msg_->u.request.u.process_info.target_process_name, "MONITOR") == 0) + { + getMonitorInfo = true; + msg_->u.request.u.process_info.target_process_name[0] = 0; + } + + if ( NameServerEnabled && !getMonitorInfo ) NameServer->ProcessInfoCont(msg_); // in reqQueue thread (CExternalReq) #endif - int count = 0; - int nid; - int pid; - - // Record statistics (sonar counters) - if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED)) - MonStats->req_type_processinfocont_Incr(); - - if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) +#ifndef NAMESERVER_PROCESS + if ( NameServerEnabled && !getMonitorInfo ) { - trace_printf("%s@%d request #%ld: ProcessInfoCont, context (%d, %d), " - "process type=%d, allnodes=%d\n", method_name, __LINE__, - id_, - msg_->u.request.u.process_info_cont.context[0].nid, - msg_->u.request.u.process_info_cont.context[0].pid, - msg_->u.request.u.process_info_cont.type, - msg_->u.request.u.process_info_cont.allNodes); + // Send reply to requester + lioreply(msg_, pid_); } + else + { +#endif + int count = 0; + int nid; + int pid; - msg_->u.reply.u.process_info.more_data = false; - - // Using context from the last reply, locate next process. - // Generally the final process in the last reply will still exist - // so we locate its CProcess object for continuation. If that - // process no longer exists we try to find other processes in the - // context list until we find the CProcess object or run out of - // context. - int i = -1; - CProcess *process = 0; + // Record statistics (sonar counters) + if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED)) + MonStats->req_type_processinfocont_Incr(); - while (!process && ++i < MAX_PROC_CONTEXT) - { - nid = msg_->u.request.u.process_info_cont.context[i].nid; - pid = msg_->u.request.u.process_info_cont.context[i].pid; - if (nid >= 0 && nid < Nodes->GetLNodesConfigMax()) + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) { - process = Nodes->GetLNode(nid)->GetProcessL(pid); + trace_printf("%s@%d request #%ld: ProcessInfoCont, context (%d, %d), " + "process type=%d, allnodes=%d\n", method_name, __LINE__, + id_, + msg_->u.request.u.process_info_cont.context[0].nid, + msg_->u.request.u.process_info_cont.context[0].pid, + msg_->u.request.u.process_info_cont.type, + msg_->u.request.u.process_info_cont.allNodes); } - } + msg_->u.reply.u.process_info.more_data = false; - if (!process) - { // Could not locate any process in the context list. So - // begin with the first process in the node. - nid = msg_->u.request.u.process_info_cont.context[0].nid; - if (trace_settings & TRACE_REQUEST) - trace_printf("%s@%d" " could not find context process, restarting for node=" "%d" "\n", method_name, __LINE__, nid); - if (nid >= 0 && nid < Nodes->GetLNodesConfigMax()) + // Using context from the last reply, locate next process. + // Generally the final process in the last reply will still exist + // so we locate its CProcess object for continuation. If that + // process no longer exists we try to find other processes in the + // context list until we find the CProcess object or run out of + // context. + int i = -1; + CProcess *process = 0; + + while (!process && ++i < MAX_PROC_CONTEXT) { - process = ProcessInfo_GetProcess (nid, msg_->u.request.u.process_info_cont.allNodes); + nid = msg_->u.request.u.process_info_cont.context[i].nid; + pid = msg_->u.request.u.process_info_cont.context[i].pid; + if (nid >= 0 && nid < Nodes->GetLNodesConfigMax()) + { + process = Nodes->GetLNode(nid)->GetProcessL(pid); + } } - } - // Assuming we found a CProcess object resume returning data with - // the subsequent process. - if (process) - { - process = process->GetNextL(); + if (!process) - { // We were at the last process on the node. Get first process - // on the next node (if there is a next node). - if (++nid < Nodes->GetLNodesConfigMax()) + { // Could not locate any process in the context list. So + // begin with the first process in the node. + nid = msg_->u.request.u.process_info_cont.context[0].nid; + if (trace_settings & TRACE_REQUEST) + trace_printf("%s@%d" " could not find context process, restarting for node=" "%d" "\n", method_name, __LINE__, nid); + if (nid >= 0 && nid < Nodes->GetLNodesConfigMax()) { - process = ProcessInfo_GetProcess(nid, - msg_->u.request.u.process_info_cont.allNodes); + process = ProcessInfo_GetProcess (nid, msg_->u.request.u.process_info_cont.allNodes); } } + // Assuming we found a CProcess object resume returning data with + // the subsequent process. if (process) { - count = ProcessInfo_BuildReply( - process, - msg_, - msg_->u.request.u.process_info_cont.type, - msg_->u.request.u.process_info_cont.allNodes, - (char *) ""); + process = process->GetNextL(); + if (!process) + { // We were at the last process on the node. Get first process + // on the next node (if there is a next node). + if (++nid < Nodes->GetLNodesConfigMax()) + { + process = ProcessInfo_GetProcess(nid, + msg_->u.request.u.process_info_cont.allNodes); + } + } + + if (process) + { + count = ProcessInfo_BuildReply( + process, + msg_, + msg_->u.request.u.process_info_cont.type, + msg_->u.request.u.process_info_cont.allNodes, + (char *) ""); + } } - } - msg_->u.reply.type = ReplyType_ProcessInfo; - msg_->u.reply.u.process_info.num_processes = count; - msg_->u.reply.u.process_info.return_code = MPI_SUCCESS; + msg_->u.reply.type = ReplyType_ProcessInfo; + msg_->u.reply.u.process_info.num_processes = count; + msg_->u.reply.u.process_info.return_code = MPI_SUCCESS; #ifdef NAMESERVER_PROCESS - monreply(msg_, sockFd_); + monreply(msg_, sockFd_); #else - // Send reply to requester - lioreply(msg_, pid_); + // Send reply to requester + lioreply(msg_, pid_); +#endif +#ifndef NAMESERVER_PROCESS + } #endif TRACE_EXIT; http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/reqqueue.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/reqqueue.cxx b/core/sqf/monitor/linux/reqqueue.cxx index ac559c2..e97c0a7 100644 --- a/core/sqf/monitor/linux/reqqueue.cxx +++ b/core/sqf/monitor/linux/reqqueue.cxx @@ -23,6 +23,7 @@ // /////////////////////////////////////////////////////////////////////////////// +#include <stddef.h> #include <stdio.h> #include <zlib.h> #include "reqqueue.h" @@ -40,6 +41,7 @@ #include "internal.h" #include "healthcheck.h" #ifndef NAMESERVER_PROCESS +#include "redirector.h" #include "nameserver.h" #include "ptpclient.h" #endif @@ -59,6 +61,7 @@ extern CHealthCheck HealthCheck; #ifdef NAMESERVER_PROCESS extern char *ErrorMsg (int error_code); #else +extern CRedirector Redirector; extern bool NameServerEnabled; extern CPtpClient *PtpClient; extern CNameServer *NameServer; @@ -69,6 +72,10 @@ extern int req_type_startup; extern bool IAmIntegrating; extern bool IAmIntegrated; +extern bool IsRealCluster; +extern bool IsAgentMode; +extern bool IsMaster; +extern bool ZClientEnabled; extern CommType_t CommType; extern bool IsRealCluster; @@ -1247,6 +1254,93 @@ void CIntExitNsReq::performRequest() #endif #ifndef NAMESERVER_PROCESS +CIntIoDataReq::CIntIoDataReq( ioData_t *ioData ) + : CInternalReq() + , nid_( ioData->nid ) + , pid_( ioData->pid ) + , verifier_( ioData->verifier ) + , ioType_( ioData->ioType ) + , length_( ioData->length ) +{ + // Add eyecatcher sequence as a debugging aid + memcpy(&eyecatcher_, "RqIK", 4); + memcpy(data_, ioData->data, (length_<=MAX_SYNC_DATA)?length_:MAX_SYNC_DATA); +} + +CIntIoDataReq::~CIntIoDataReq() +{ + // Alter eyecatcher sequence as a debugging aid to identify deleted object + memcpy(&eyecatcher_, "rQik", 4); +} + +void CIntIoDataReq::populateRequestString( void ) +{ + char strBuf[MON_STRING_BUF_SIZE/2]; + sprintf( strBuf, "IntReq(%s) req #=%ld (nid=%d/pid=%d/verifier=%d), type=%d, length=%d" + , CReqQueue::intReqType[InternalType_IoData] + , getId(), nid_, pid_, verifier_, ioType_, length_ ); + requestString_.assign( strBuf ); +} + +void CIntIoDataReq::performRequest() +{ + const char method_name[] = "CIntIoDataReq::performRequest"; + TRACE_ENTRY; + + if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS)) + { + trace_printf( "%s@%d" " - IO data " + "to (%d,%d:%d), count=%d\n(%s)" + , method_name, __LINE__ + , nid_ + , pid_ + , verifier_ + , length_ + , length_?data_:"\n" ); + } + if ( MyNode->IsMyNode( nid_ ) ) + { + if (trace_settings & (TRACE_SYNC | TRACE_REDIRECTION)) + trace_printf( "%s@%d - processing IO Data for (%d, %d:%d)\n" + , method_name, __LINE__ + , nid_, pid_, verifier_ ); + + CLNode *lnode; + lnode = Nodes->GetLNode( nid_ ); + if ( lnode ) + { + CProcess *process; + process = lnode->GetProcessL( pid_ ); + if (process) + { + int fd; + if (ioType_ == STDIN_DATA) + { + fd = process->FdStdin(); + } + else + { + fd = process->FdStdout(); + } + Redirector.disposeIoData( fd, length_, data_ ); + } + else + { + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf) + , "[%s], Can't find process nid" + "=%d, pid=%d for processing IO Data.\n" + , method_name, nid_, pid_ ); + mon_log_write(MON_REQ_IODATA_1, SQ_LOG_ERR, buf); + } + } + } + + TRACE_EXIT; +} +#endif + +#ifndef NAMESERVER_PROCESS CIntKillReq::CIntKillReq( struct kill_def *killDef ) : CInternalReq() , nid_( killDef->nid ) @@ -1527,8 +1621,17 @@ void CIntNewProcReq::performRequest() programStrId_, &stringData_[nameLen_], // infile &stringData_[nameLen_ + infileLen_], // outfile + reqTag_, result); -#ifndef NAMESERVER_PROCESS +#ifdef NAMESERVER_PROCESS + if ( newProcess == NULL ) + { + char buf[MON_STRING_BUF_SIZE]; + sprintf( buf, "[%s], Can't create process %s (%d,%d:%d)\n" + , method_name, &stringData_[0],nid_, pid_, verifier_ ); + mon_log_write(MON_INTREQ_NEWPROC_1, SQ_LOG_ERR, buf); + } +#else if ( newProcess != NULL ) { newProcess->userArgs ( argc_, argvLen_, @@ -1536,27 +1639,19 @@ void CIntNewProcReq::performRequest() + outfileLen_] ); // Create the new process (fork/exec) - if (newProcess->Create(newProcess->GetParent(), result)) + if (newProcess->Create(newProcess->GetParent(), reqTag_, result)) { MyNode->AddToNameMap( newProcess ); MyNode->AddToPidMap( newProcess->GetPid(), newProcess ); - if (NameServerEnabled) - { - // Send actual pid and process name back to parent - PtpClient->ProcessInit( newProcess - , reqTag_ - , 0 - , parentNid_ ); - } - else + if (!NameServerEnabled) { // Successfully forked process. Replicate actual process // id and process name. CReplProcInit *repl = new CReplProcInit(newProcess, reqTag_, 0, parentNid_); Replicator.addItem(repl); - } + } } else { @@ -1592,7 +1687,7 @@ void CIntNewProcReq::performRequest() sprintf(buf, "[%s], Can't find parent process nid=%d, pid=%d " "for process create.\n", method_name, parentNid_, parentPid_ ); - mon_log_write(MON_CLUSTER_HANDLEOTHERNODE_10, SQ_LOG_ERR, buf); + mon_log_write(MON_INTREQ_NEWPROC_2, SQ_LOG_ERR, buf); } TRACE_EXIT; @@ -1998,8 +2093,10 @@ void CIntProcInitReq::performRequest() const char method_name[] = "CIntProcInitReq::performRequest"; TRACE_ENTRY; - if (trace_settings & TRACE_SYNC) - trace_printf("%s@%d - processing process init %s (%d, %d), tag %p\n", method_name, __LINE__, name_, nid_, pid_, process_); + if (trace_settings & (TRACE_SYNC | TRACE_PROCESS)) + trace_printf( "%s@%d - processing process init %s (%d, %d), result=%d, tag=%p\n" + , method_name, __LINE__ + , name_, nid_, pid_, result_, static_cast<void*>(process_) ); if ( result_ != 0 ) { // Was unable to create the process, send response to requester @@ -2020,9 +2117,8 @@ void CIntProcInitReq::performRequest() process_->SetName ( name_ ); // Add to pid and name maps - Nodes->GetLNode (process_->GetNid())->GetNode()-> - AddToPidMap(process_->GetPid(), process_); - Nodes->GetLNode (process_->GetNid())->GetNode()->AddToNameMap(process_); + Nodes->GetLNode( process_->GetNid() )->GetNode()->AddToPidMap(process_->GetPid(), process_); + Nodes->GetLNode( process_->GetNid() )->GetNode()->AddToNameMap(process_); if (process_->IsBackup()) { @@ -2032,7 +2128,7 @@ void CIntProcInitReq::performRequest() if (parent) { // Set link from primary process object to // this backup process object. - if (trace_settings & TRACE_SYNC) + if (trace_settings & (TRACE_SYNC | TRACE_PROCESS)) { trace_printf("%s@%d - For backup process (%d, %d)" ", for parent (%d, %d) setting " @@ -2125,6 +2221,127 @@ void CIntSetReq::performRequest() TRACE_EXIT; } +#ifndef NAMESERVER_PROCESS +CIntStdInReq::CIntStdInReq( struct stdin_req_def *stdin_req ) + : CInternalReq() + , nid_( stdin_req->nid ) + , pid_( stdin_req->pid ) + , verifier_( stdin_req->verifier ) + , reqType_( stdin_req->reqType ) + , supplierNid_( stdin_req->supplier_nid ) + , supplierPid_( stdin_req->supplier_pid ) +{ + // Add eyecatcher sequence as a debugging aid + memcpy(&eyecatcher_, "RqIS", 4); +} + +CIntStdInReq::~CIntStdInReq() +{ + // Alter eyecatcher sequence as a debugging aid to identify deleted object + memcpy(&eyecatcher_, "rQis", 4); +} + +void CIntStdInReq::populateRequestString( void ) +{ + char strBuf[MON_STRING_BUF_SIZE/2]; + sprintf( strBuf, "IntReq(%s) req #=%ld (nid=%d/pid=%d/verifier=%d), " + "type=%d, supplier (%d,%d)" + , CReqQueue::intReqType[InternalType_StdinReq] + , getId(), nid_, pid_, verifier_, reqType_ + , supplierNid_, supplierPid_ ); + requestString_.assign( strBuf ); +} + +void CIntStdInReq::performRequest() +{ + const char method_name[] = "CIntStdInReq::performRequest"; + TRACE_ENTRY; + + if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS)) + { + trace_printf("%s@%d - stdin request from (%d,%d:%d)" + ", type=%d, for supplier (%d,%d)\n" + , method_name, __LINE__ + , nid_ + , pid_ + , verifier_ + , reqType_ + , supplierNid_ + , supplierPid_ ); + } + + if ( !MyNode->IsMyNode( supplierNid_ ) ) + { + return; + } + + CLNode *lnode; + lnode = Nodes->GetLNode( nid_ ); + if ( lnode == NULL ) + { + return; + } + + CProcess *process; + process = lnode->GetProcessL( pid_ ); + if (process) + { + if (reqType_ == STDIN_REQ_DATA) + { + // Set up to forward stdin data to requester. + // Save file descriptor associated with stdin + // so can find the redirector object later. + CProcess *supProcess; + lnode = Nodes->GetLNode( supplierNid_ ); + if ( lnode ) + { + supProcess = lnode->GetProcessL ( supplierPid_ ); + if (supProcess) + { + int fd; + fd = Redirector.stdinRemote( supProcess->infile() + , supplierNid_ + , supplierPid_ ); + process->FdStdin(fd); + } + else + { + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf), + "[%s], Can't find supplier process " + "nid=%d, pid=%d for stdin data request.\n" + , method_name + , supplierNid_ + , supplierPid_); + mon_log_write(MON_REQ_STDIN_1, SQ_LOG_ERR, buf); + } + } + } + else if (reqType_ == STDIN_FLOW_OFF) + { + Redirector.stdinOff(process->FdStdin()); + } + else if (reqType_ == STDIN_FLOW_ON) + { + Redirector.stdinOn(process->FdStdin()); + } + } + else + { + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf) + , "[%s], Can't find process nid=%d, " + "pid=%d for stdin data request.\n" + , method_name + , nid_ + , pid_ ); + mon_log_write(MON_REQ_STDIN_2, SQ_LOG_ERR, buf); + } + + TRACE_EXIT; +} +#endif + CIntUniqStrReq::CIntUniqStrReq( int nid, int id, const char *value ) : CInternalReq(), nid_(nid), id_(id) { @@ -3191,9 +3408,7 @@ void CIntReviveReq::performRequest() #ifndef NAMESERVER_PROCESS // unpack the current TM leader Monitor->SetTmLeader( header.tmLeader_ ); -#endif -#ifndef NAMESERVER_PROCESS if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) trace_printf( "%s@%d - TM leader (%d) unpacked\n", method_name, __LINE__ , Monitor->GetTmLeader() ); @@ -4409,7 +4624,17 @@ void CReqQueue::enqueueExitNsReq( struct exit_ns_def *exitDef ) #endif #ifndef NAMESERVER_PROCESS -//void CReqQueue::enqueueKillReq( int nid, int pid, bool abort ) +void CReqQueue::enqueueIoDataReq( ioData_t *ioData ) +{ + CInternalReq * request; + + request = new CIntIoDataReq ( ioData ); + + enqueueReq ( request ); +} +#endif + +#ifndef NAMESERVER_PROCESS void CReqQueue::enqueueKillReq( struct kill_def *killDef ) { CInternalReq * request; @@ -4495,6 +4720,17 @@ void CReqQueue::enqueueSetReq( struct set_def *setDef ) enqueueReq ( request ); } +#ifndef NAMESERVER_PROCESS +void CReqQueue::enqueueStdInReq( struct stdin_req_def *stdin_req ) +{ + CInternalReq * request; + + request = new CIntStdInReq ( stdin_req ); + + enqueueReq ( request ); +} +#endif + void CReqQueue::enqueueUniqStrReq( struct uniqstr_def *uniqStrDef ) { CIntUniqStrReq * request; @@ -4549,7 +4785,7 @@ void CReqQueue::enqueueTmReadyReq( int nid ) } #endif -// this function moves the queued requests from revieve queue to the main request queue. +// this function moves the queued requests from revive queue to the main request queue. // it will skip the requests whose seq num is less than the given one. void CReqQueue::processReviveRequests(unsigned long long minSeqNum) { http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/reqqueue.h ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/reqqueue.h b/core/sqf/monitor/linux/reqqueue.h index 995912d..34cde08 100644 --- a/core/sqf/monitor/linux/reqqueue.h +++ b/core/sqf/monitor/linux/reqqueue.h @@ -988,6 +988,27 @@ private: #endif #ifndef NAMESERVER_PROCESS +class CIntIoDataReq: public CInternalReq +{ +public: + CIntIoDataReq( ioData_t *ioData ); + virtual ~CIntIoDataReq(); + + void performRequest(); + +private: + void populateRequestString( void ); + + int nid_; + int pid_; + Verifier_t verifier_; + StdIoType ioType_; + int length_; // Length in bytes of Data buffer used + char data_[MAX_SYNC_DATA]; +}; +#endif + +#ifndef NAMESERVER_PROCESS class CIntKillReq: public CInternalReq { public: @@ -1150,6 +1171,27 @@ private: char value_[MAX_VALUE_SIZE_INT]; }; +#ifndef NAMESERVER_PROCESS +class CIntStdInReq: public CInternalReq +{ +public: + CIntStdInReq( struct stdin_req_def *stdin_req ); + virtual ~CIntStdInReq(); + + void performRequest(); + +private: + void populateRequestString( void ); + + int nid_; + int pid_; + Verifier_t verifier_; + StdinReqType reqType_; + int supplierNid_; // Node id of process supplying stdin data + int supplierPid_; // Process id of process to supplying stdin data +}; +#endif + class CIntUniqStrReq: public CInternalReq { public: @@ -1506,6 +1548,7 @@ class CReqQueue void enqueueDeleteReq( struct delete_def *deleteDef ); #endif #ifndef NAMESERVER_PROCESS + void enqueueIoDataReq( ioData_t *ioData ); void enqueueKillReq( struct kill_def *killDef ); #endif void enqueueNewProcReq( struct process_def *procDef ); @@ -1515,6 +1558,9 @@ class CReqQueue #endif void enqueueProcInitReq( struct process_init_def *procInitDef ); void enqueueSetReq( struct set_def *setDef ); +#ifndef NAMESERVER_PROCESS + void enqueueStdInReq( struct stdin_req_def *stdin_req ); +#endif void enqueueUniqStrReq( struct uniqstr_def *uniqStrDef ); #ifndef NAMESERVER_PROCESS void enqueueChildDeathReq ( pid_t pid ); @@ -1637,6 +1683,7 @@ private: RQIH CIntShutdownReq RQII CIntProcInitReq RQIJ CIntNodeAddReq + RqIK CIntIoDataReq RQIK CIntKillReq RQIL CIntCloneProcReq RQIM CIntActivateSpareReq @@ -1646,6 +1693,7 @@ private: RQIQ CIntUpReq RQIR CIntReviveReq RQIS CIntSetReq + RqIS CIntStdInReq RQIT CIntNodeDeleteReq RQIU CQuiesceReq RQIV CIntTmReadyReq http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/shell.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/shell.cxx b/core/sqf/monitor/linux/shell.cxx index 20b41e6..51206d3 100644 --- a/core/sqf/monitor/linux/shell.cxx +++ b/core/sqf/monitor/linux/shell.cxx @@ -323,7 +323,7 @@ const char *StateString( STATE state ) str = "Shutdown"; break; case State_Initializing: - str = "Initing"; + str = "Initializing"; break; case State_Merged: str = "Merged"; @@ -3045,7 +3045,6 @@ void listZoneInfo( int nid, int zid ) { int i; int count; - int last_nid = 0; MPI_Status status; if ( gp_local_mon_io->acquire_msg( &msg ) != 0 ) @@ -3098,7 +3097,6 @@ void listZoneInfo( int nid, int zid ) if ( msg->u.reply.u.zone_info.node[i].nid != -1 ) { // Display zone node info - last_nid = msg->u.reply.u.zone_info.node[i].nid; // "[%s] ZID PNID State Name\n", MyName); // "[%s] --- ---- -------- --------\n", MyName); printf ("[%s] %3.3d %3.3d %-8s %s\n", @@ -5333,7 +5331,7 @@ int start_process (int *nid, PROCESSTYPE type, char *name, bool debug, int prior count = 0; while (*cmd_tail && count < MAX_ARGS) { - cmd_tail = get_token (cmd_tail, token, &delimiter, MAX_TOKEN, + cmd_tail = get_token (cmd_tail, token, &delimiter, (MAX_ARG_SIZE - 1), false /* equal is not a delim */); strncpy (msg->u.request.u.new_process.argv[count], token, MAX_ARG_SIZE - 1); @@ -7309,6 +7307,9 @@ void node_cmd (char *cmd_tail) sprintf( msgString, "[%s] Node delete is not available with Virtual Nodes!",MyName); write_startup_log( msgString ); printf ("%s\n", msgString); + } + else + { if (ElasticityEnabled) { // <nid> | <node-name> @@ -8152,7 +8153,6 @@ void persist_exec_cmd( char *cmd ) const char method_name[] = "persist_exec_cmd"; char *cmd_tail = cmd; char delimiter; - char *ptr; char token[MAX_TOKEN]; CPersistConfig *persistConfig; @@ -8181,7 +8181,7 @@ void persist_exec_cmd( char *cmd ) if (ClusterConfig.IsConfigReady()) { // Parse cmd to get persist-process-prefix - ptr = get_token (cmd_tail, token, &delimiter); + get_token (cmd_tail, token, &delimiter); if (*token != '\0') { // Get persist process configuration @@ -8266,7 +8266,6 @@ void persist_kill_cmd( char *cmd ) const char method_name[] = "persist_kill_cmd"; char *cmd_tail = cmd; char delimiter; - char *ptr; char token[MAX_TOKEN]; CPersistConfig *persistConfig; @@ -8286,7 +8285,7 @@ void persist_kill_cmd( char *cmd ) if (ClusterConfig.IsConfigReady()) { // Parse cmd to get persist-process-prefix - ptr = get_token (cmd_tail, token, &delimiter); + get_token (cmd_tail, token, &delimiter); if (*token != '\0') { // Get persist process configuration http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/system.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/system.cxx b/core/sqf/monitor/linux/system.cxx index ca4b968..49b9878 100644 --- a/core/sqf/monitor/linux/system.cxx +++ b/core/sqf/monitor/linux/system.cxx @@ -586,7 +586,7 @@ void CUtility::GetOutput( char *buf, int bufSize ) bytes = (size <= (bufSize - count)) ? size : (bufSize - count); count =+ bytes; memcpy( ptr, str.data(), bytes ); - ptr =+ (char *)bytes; + for (int i=0; i<bytes ;ptr++ ); } } http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/tmsync.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/tmsync.cxx b/core/sqf/monitor/linux/tmsync.cxx index b87f0f4..e6e3a76 100644 --- a/core/sqf/monitor/linux/tmsync.cxx +++ b/core/sqf/monitor/linux/tmsync.cxx @@ -42,7 +42,9 @@ using namespace std; #include "tmsync.h" #include "mlio.h" #include "reqqueue.h" +#include "nameserver.h" +extern bool NameServerEnabled; extern int trace_level; extern int MyPNID; extern sigset_t SigSet; @@ -926,6 +928,16 @@ void CTmSync_Container::SendUnsolicitedMessages (void) // Get the TM that initiated the sync request tm = LNode[req->Nid]->GetProcessLByType( ProcessType_DTM ); } + if (!tm && NameServerEnabled) + { + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) + { + trace_printf( "%s@%d - Getting process from Name Server, nid=%d, type=ProcessType_DTM\n" + , method_name, __LINE__, req->Nid ); + } + + tm = Nodes->GetProcessLByTypeNs( req->Nid, ProcessType_DTM ); + } if ( tm ) { // send all TmSync requests data to the local TM processes http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/zclient.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/zclient.cxx b/core/sqf/monitor/linux/zclient.cxx index 0ca03b1..d2cd0be 100644 --- a/core/sqf/monitor/linux/zclient.cxx +++ b/core/sqf/monitor/linux/zclient.cxx @@ -577,12 +577,19 @@ const char* CZClient::WaitForAndReturnMaster( bool doWait ) { if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) { - trace_printf( "%s@%d (MasterMonitor) Master Monitor found (%s)\n" - , method_name, __LINE__, masterMonitor.c_str() ); + trace_printf( "%s@%d (MasterMonitor) Master Monitor found (%s/%s)\n" + , method_name, __LINE__, masterMonitor.c_str(), nodes.data[0] ); } TRACE_EXIT; return nodes.data[0]; } + else + { + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf( "%s@%d (MasterMonitor) Master Monitor NOT found\n" , method_name, __LINE__); + } + } TRACE_EXIT; return NULL; @@ -734,6 +741,59 @@ int CZClient::GetZNodeData( string &monZnode, string &nodeName, int &pnid ) return( rc ); } +void CZClient::HandleMasterZNode ( void ) +{ + const char method_name[] = "CZClient::HandleMasterZNode"; + TRACE_ENTRY; + + char pathStr[MAX_PROCESSOR_NAME] = { 0 }; + char nodeName[MAX_PROCESSOR_NAME] = { 0 }; + char *tkn = NULL; + char *tknStart = pathStr; + char *tknLast = NULL; + string monZnode; + + monZnode.assign( znodeQueue_.front() ); + + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf("%s@%d" " - znodePath=%s, znodeQueue_.size=%ld\n" + , method_name, __LINE__ + , monZnode.c_str(), znodeQueue_.size() ); + } + + znodeQueue_.pop_front(); + + strcpy( pathStr, monZnode.c_str() ); + tknStart++; // skip the first '/' + tkn = strtok( tknStart, "/" ); + do + { + tknLast = tkn; + tkn = strtok( NULL, "/" ); + } + while( tkn != NULL ); + + strcpy( nodeName, tknLast ); + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf( "%s@%d nodeName=%s\n" + , method_name, __LINE__ + , strlen(nodeName) ? nodeName : "" ); + } + + string masterpath = zkRootNode_ + zkRootNodeInstance_ + ZCLIENT_MASTER_ZNODE; + std::size_t found = monZnode.find(masterpath); + // if it is the master node, then call HandleAssignMonitorLeader + if (found!=std::string::npos) + // zookeeper node, assume stale + { + HandleAssignMonitorLeader(nodeName); + } + + TRACE_EXIT; +} + void CZClient::HandleExpiredZNode( void ) { const char method_name[] = "CZClient::HandleExpiredZNode"; @@ -778,13 +838,23 @@ void CZClient::HandleExpiredZNode( void ) , strlen(nodeName) ? nodeName : "" ); } - char buf[MON_STRING_BUF_SIZE]; - snprintf( buf, sizeof(buf) + string masterpath = zkRootNode_ + zkRootNodeInstance_ + ZCLIENT_MASTER_ZNODE; + std::size_t found = monZnode.find(masterpath); + // if it is not the master node, then call HandleNodeExpiration + if (found==std::string::npos) + { + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf) , "[%s], %s was deleted, handling node (%s) as a down node!\n" , method_name, monZnode.c_str(), nodeName ); - mon_log_write(MON_ZCLIENT_CHECKZNODE_1, SQ_LOG_ERR, buf); - - HandleNodeExpiration( nodeName ); + mon_log_write(MON_ZCLIENT_CHECKZNODE_1, SQ_LOG_ERR, buf); + + HandleNodeExpiration( nodeName ); + } + else // zookeeper node, assume stale + { + HandleAssignMonitorLeader(nodeName); + } } else { @@ -1210,6 +1280,11 @@ void CZClient::MonitorZCluster() HandleExpiredZNode(); SetState( ZC_MYZNODE ); } + // we still need to check if the master went down + else + { + HandleMasterZNode(); + } break; case ZC_STOP: StopClusterMonitoring(); @@ -1580,7 +1655,34 @@ void CZClient::TriggerCheck( int type, const char *znodePath ) , ZooConnectionTypeStr( type ) ); } - if ( type == ZOO_CREATED_EVENT ) + // Leader stuff only relevant in agenMode + string masterpath = zkRootNode_ + zkRootNodeInstance_ + ZCLIENT_MASTER_ZNODE; + std::string monZnode(znodePath); + std::size_t found = monZnode.find(masterpath); + // if it is not the master node, then call HandleNodeExpiration + + if (found!=std::string::npos) + // zookeeper node, assume stale + { + char nodeName[MAX_PROCESSOR_NAME] = { 0 }; + char tempName[MAX_PROCESSOR_NAME] = { 0 }; + char *tkn = NULL; + const char *tknStart = znodePath; + char *tknLast = NULL; + tknStart++; // skip the first '/' + strcpy (tempName, tknStart); + tkn = strtok( tempName, "/" ); + strcpy (tempName, tknStart); + do + { + tknLast = tkn; + tkn = strtok( NULL, "/" ); + } + while( tkn != NULL ); + strcpy( nodeName, tknLast ); + HandleAssignMonitorLeader (nodeName); + } + else if ( type == ZOO_CREATED_EVENT ) { SetState( ZC_ZNODE, znodePath ); } @@ -1778,7 +1880,7 @@ int CZClient::WatchNodeMasterDelete( const char *nodeName ) newpath.str( "" ); newpath << zkRootNode_.c_str() << zkRootNodeInstance_.c_str() - << ZCLIENT_MASTER_ZNODE + << ZCLIENT_MASTER_ZNODE <<"/" << nodeName; string monZnode = newpath.str( ); http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/zclient.h ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/zclient.h b/core/sqf/monitor/linux/zclient.h index 916ed11..22cf730 100644 --- a/core/sqf/monitor/linux/zclient.h +++ b/core/sqf/monitor/linux/zclient.h @@ -119,6 +119,7 @@ typedef list<string> ZNodeList_t; // the nodeName passed in expires. extern void HandleMyNodeExpiration( void ); extern void HandleNodeExpiration( const char *nodeName ); +extern void HandleAssignMonitorLeader ( const char* failedMaster ); class CZClient : public CLock { @@ -168,6 +169,7 @@ private: int GetZNodeData( string &monZnode, string &nodeName, int &pnid ); ZClientState_t GetState( void ) { CAutoLock lock(getLocker()); return( state_ ); } void HandleExpiredZNode( void ); + void HandleMasterZNode ( void ); int InitializeZClient( void ); bool IsEnabled( void ) { CAutoLock lock(getLocker()); return( enabled_ ); } bool IsCheckCluster( void ) { CAutoLock lock(getLocker()); return( checkCluster_ ); } http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/linux/zootest.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/zootest.cxx b/core/sqf/monitor/linux/zootest.cxx index 94d2605..4b7a11f 100644 --- a/core/sqf/monitor/linux/zootest.cxx +++ b/core/sqf/monitor/linux/zootest.cxx @@ -55,6 +55,14 @@ int MyPid = -1; CZClient *ZClient = NULL; CMonLog *MonLog = NULL; +void HandleAssignMonitorLeader ( const char* failedMaster ) +{ + const char method_name[] = "HandleAssignMonitorLeader"; + TRACE_ENTRY; + failedMaster = failedMaster; + TRACE_EXIT; +} + void HandleMyNodeExpiration( void ) { const char method_name[] = "HandleMyNodeExpiration"; http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/test/monitor.env ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/test/monitor.env b/core/sqf/monitor/test/monitor.env index 52e2341..25029ce 100644 --- a/core/sqf/monitor/test/monitor.env +++ b/core/sqf/monitor/test/monitor.env @@ -19,47 +19,28 @@ # # @@@ END COPYRIGHT @@@ -# Monitor process run mode: -# -# AGENT - monitor process runs in agent mode versus MPI collective -# -# Uncomment the next three environment variables -#export SQ_MON_RUN_MODE=AGENT -MONITOR_COMM_PORT=23390 -MONITOR_SYNC_PORT=23380 -# -# NAME-SERVER - to disable process replication and enable name-server -# -# Uncomment the next six environment variables -SQ_NAMESERVER_ENABLED=1 -NS_COMM_PORT=23370 -NS_SYNC_PORT=23360 -NS_M2N_COMM_PORT=23350 -MON2MON_COMM_PORT=23340 -MONITOR_COMM_PORT=23330 - # Uncomment MON_TRACE_ENABLE and specific tracing level to enable # Trafodion monitor process tracing -MON_TRACE_ENABLE=1 -MON_TRACE_EVLOG_MSG=1 -MON_TRACE_INIT=1 -MON_TRACE_RECOVERY=1 -MON_TRACE_REQUEST=1 -MON_TRACE_PROCESS=1 -MON_TRACE_NOTICE=1 -MON_TRACE_NS=1 +#MON_TRACE_ENABLE=1 +#MON_TRACE_EVLOG_MSG=1 +#MON_TRACE_INIT=1 +#MON_TRACE_RECOVERY=1 +#MON_TRACE_REQUEST=1 +#MON_TRACE_PROCESS=1 +#MON_TRACE_NOTICE=1 +#MON_TRACE_NS=1 #MON_TRACE_SYNC=1 # Enable TC_TRACE_* along with MON_TRACE_TRAFCONFIG for more detail #MON_TRACE_TRAFCONFIG=1 #MON_TRACE_MLIO=1 #MON_TRACE_REQUEST_DETAIL=1 -MON_TRACE_PROCESS_DETAIL=1 +#MON_TRACE_PROCESS_DETAIL=1 #MON_TRACE_NOTICE_DETAIL=1 #MON_TRACE_SYNC_DETAIL=1 #MON_TRACE_MLIO_DETAIL=1 -MON_TRACE_MEAS +#MON_TRACE_MEAS=1 #MON_TRACE_TMSYNC=1 #MON_TRACE_STATS=1 #MON_TRACE_ENTRY_EXIT=1 @@ -78,4 +59,3 @@ MON_TRACE_MEAS #TC_TRACE_INIT=1 #TC_TRACE_LOG_MSG=1 #TC_TRACE_ENTRY_EXIT=1 - http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/monitor/test/sqconfig.monitor.virtual ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/test/sqconfig.monitor.virtual b/core/sqf/monitor/test/sqconfig.monitor.virtual index 9ab56e8..ad23ec2 100644 --- a/core/sqf/monitor/test/sqconfig.monitor.virtual +++ b/core/sqf/monitor/test/sqconfig.monitor.virtual @@ -24,9 +24,5 @@ _virtualnodes 6 end node begin name-server -#nodes=0 -nodes=0,1 -#nodes=0,1,2 -#nodes=0,1,2,3 -#nodes=0,1,2,3,4,5 +nodes=0,1,2,3,4,5 end name-server http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/sqenvcom.sh ---------------------------------------------------------------------- diff --git a/core/sqf/sqenvcom.sh b/core/sqf/sqenvcom.sh index 59d30cd..84b2e1f 100644 --- a/core/sqf/sqenvcom.sh +++ b/core/sqf/sqenvcom.sh @@ -673,31 +673,31 @@ export SQ_LUNMGR_VERBOSITY=1 # Control SQ default startup behavior (c=cold, w=warm, if removed sqstart will autocheck) export SQ_STARTUP=r -# Monitor process creator: +# +# NOTE: in a Python installation when SQ_MON_RUN_MODE below +# is AGENT the SQ_MON_CREATOR must be MPIRUN # # MPIRUN - monitor process is created by mpirun +# (meaning that mpirun is the parent process of the monitor process) +# AGENT - monitor process runs in agent mode versus MPI collective # -# Uncomment SQ_MON_CREATOR when running monitor in AGENT mode +# Uncomment the next four environment variables #export SQ_MON_CREATOR=MPIRUN - -# Monitor process run mode: -# -# AGENT - monitor process runs in agent mode versus MPI collective -# -# Uncomment the next three environment variables #export SQ_MON_RUN_MODE=AGENT #export MONITOR_COMM_PORT=23390 #export MONITOR_SYNC_PORT=23380 + # -# NAME-SERVER - to disable process replication and enable name-server +# NAME-SERVER - to disable process replication and enable the name-server # -# Uncomment the next six environment variables +# Uncomment the next environment variable #export SQ_NAMESERVER_ENABLED=1 -#export NS_COMM_PORT=23370 -#export NS_SYNC_PORT=23360 -#export NS_M2N_COMM_PORT=23350 -#export MON2MON_COMM_PORT=23340 -#export MONITOR_COMM_PORT=23330 +if [[ "$SQ_NAMESERVER_ENABLED" == "1" ]]; then + export NS_COMM_PORT=23370 + export NS_SYNC_PORT=23360 + export NS_M2N_COMM_PORT=23350 + export MON2MON_COMM_PORT=23340 +fi # Alternative logging capability in monitor export SQ_MON_ALTLOG=0 http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/sql/scripts/monitor.env ---------------------------------------------------------------------- diff --git a/core/sqf/sql/scripts/monitor.env b/core/sqf/sql/scripts/monitor.env index 2bfa4a7..25029ce 100644 --- a/core/sqf/sql/scripts/monitor.env +++ b/core/sqf/sql/scripts/monitor.env @@ -40,7 +40,7 @@ #MON_TRACE_SYNC_DETAIL=1 #MON_TRACE_MLIO_DETAIL=1 -#MON_TRACE_MEAS +#MON_TRACE_MEAS=1 #MON_TRACE_TMSYNC=1 #MON_TRACE_STATS=1 #MON_TRACE_ENTRY_EXIT=1 http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/sql/scripts/sqconfig ---------------------------------------------------------------------- diff --git a/core/sqf/sql/scripts/sqconfig b/core/sqf/sql/scripts/sqconfig index a753549..c45896f 100644 --- a/core/sqf/sql/scripts/sqconfig +++ b/core/sqf/sql/scripts/sqconfig @@ -23,9 +23,9 @@ begin node _virtualnodes 2 end node -#begin name-server -#nodes=0,1 -#end name-server +begin name-server +nodes=0,1 +end name-server ############################################################################### # http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/sql/scripts/sqnameserver.pm ---------------------------------------------------------------------- diff --git a/core/sqf/sql/scripts/sqnameserver.pm b/core/sqf/sql/scripts/sqnameserver.pm index 96572bd..87a576c 100644 --- a/core/sqf/sql/scripts/sqnameserver.pm +++ b/core/sqf/sql/scripts/sqnameserver.pm @@ -97,12 +97,12 @@ sub parseStatement { my $eq; ($eq, $s) = parseEq($s); if ($eq) { - while ($s =~ /([A-Za-z0-9-]+)(\s*,\s*)/) { + while ($s =~ /([A-Za-z0-9.\-]+)(\s*,\s*)/) { my $nodeName = $1; $s =~ s:$nodeName$2::; push(@g_nodeNames, $nodeName); } - if ($s =~ /([A-Za-z0-9-]+)/) { + if ($s =~ /([A-Za-z0-9.\-]+)/) { my $nodeName = $1; $s =~ s:$nodeName::; push(@g_nodeNames, $nodeName); http://git-wip-us.apache.org/repos/asf/trafodion/blob/3931a75f/core/sqf/sql/scripts/sqnodes.pm ---------------------------------------------------------------------- diff --git a/core/sqf/sql/scripts/sqnodes.pm b/core/sqf/sql/scripts/sqnodes.pm index 36d8f0c..0d09565 100644 --- a/core/sqf/sql/scripts/sqnodes.pm +++ b/core/sqf/sql/scripts/sqnodes.pm @@ -279,10 +279,10 @@ sub verifyParse displayStmt($stmtOk); print " Error: node-id not specified\n"; } - elsif ($nodeId > 255) + elsif ($nodeId > 1023) { displayStmt($stmtOk); - print " Error: node-id must be in the range 0..255.\n"; + print " Error: node-id must be in the range 0..1023.\n"; } if (@cores == 0) {
