Preliminary monitor p-2-p communication changes.
Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/abf3c429 Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/abf3c429 Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/abf3c429 Branch: refs/heads/master Commit: abf3c4291d0f54dc893925151b631fd06aab0883 Parents: 975b0f4 Author: Zalo Correa <[email protected]> Authored: Wed Mar 28 17:02:34 2018 -0700 Committer: Zalo Correa <[email protected]> Committed: Wed Mar 28 17:02:34 2018 -0700 ---------------------------------------------------------------------- core/sqf/conf/log4cxx.monitor.trafns.config | 2 +- .../export/include/common/evl_sqlog_eventnum.h | 24 + core/sqf/export/include/trafconf/trafconfig.h | 2 +- core/sqf/monitor/linux/cluster.cxx | 146 ++-- core/sqf/monitor/linux/internal.h | 22 +- core/sqf/monitor/linux/makefile | 1 + core/sqf/monitor/linux/mlio.cxx | 10 +- core/sqf/monitor/linux/monitor.cxx | 38 +- core/sqf/monitor/linux/montrace.cxx | 4 +- core/sqf/monitor/linux/msgdef.h | 41 +- core/sqf/monitor/linux/nameserver.cxx | 93 ++- core/sqf/monitor/linux/nameserver.h | 1 + core/sqf/monitor/linux/notice.cxx | 155 +++- core/sqf/monitor/linux/notice.h | 5 + core/sqf/monitor/linux/nscommacceptmon.cxx | 95 ++- core/sqf/monitor/linux/nscommacceptmon.h | 1 + core/sqf/monitor/linux/nsprocess.cxx | 12 +- core/sqf/monitor/linux/nsreqnewproc.cxx | 65 +- core/sqf/monitor/linux/nsreqprocinfons.cxx | 206 +++++ core/sqf/monitor/linux/nsreqqueue.cxx | 10 + core/sqf/monitor/linux/pnode.cxx | 149 ++++ core/sqf/monitor/linux/pnode.h | 6 + core/sqf/monitor/linux/process.cxx | 243 ++++-- core/sqf/monitor/linux/process.h | 16 +- core/sqf/monitor/linux/ptpclient.cxx | 717 ++++++++++++----- core/sqf/monitor/linux/ptpclient.h | 40 +- core/sqf/monitor/linux/ptpcommaccept.cxx | 265 ++----- core/sqf/monitor/linux/ptpcommaccept.h | 2 - core/sqf/monitor/linux/replicate.cxx | 30 +- core/sqf/monitor/linux/reqkill.cxx | 26 +- core/sqf/monitor/linux/reqnewproc.cxx | 28 +- core/sqf/monitor/linux/reqnotify.cxx | 155 +++- core/sqf/monitor/linux/reqopen.cxx | 74 +- core/sqf/monitor/linux/reqprocinfo.cxx | 28 +- core/sqf/monitor/linux/reqqueue.cxx | 775 +++++++++++++++---- core/sqf/monitor/linux/reqqueue.h | 84 ++ core/sqf/monitor/test/childExitChild.cxx | 1 + core/sqf/monitor/test/childExitParent.cxx | 1 + core/sqf/monitor/test/montestutil.cxx | 4 +- core/sqf/monitor/test/runtest | 242 ++++-- core/sqf/monitor/test/sqconfig.monitor.virtual | 3 +- core/sqf/sqenvcom.sh | 16 +- core/sqf/sql/scripts/monitor.env | 19 +- core/sqf/sql/scripts/pstat | 2 +- core/sqf/sql/scripts/sqconfig | 4 + core/sqf/src/trafconf/trafconf.cpp | 152 +++- core/sqf/src/trafconf/trafconfig.h | 0 47 files changed, 3127 insertions(+), 888 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/conf/log4cxx.monitor.trafns.config ---------------------------------------------------------------------- diff --git a/core/sqf/conf/log4cxx.monitor.trafns.config b/core/sqf/conf/log4cxx.monitor.trafns.config index 40091cb..2fd23e8 100644 --- a/core/sqf/conf/log4cxx.monitor.trafns.config +++ b/core/sqf/conf/log4cxx.monitor.trafns.config @@ -31,7 +31,7 @@ log4j.threshhold=ALL # Rolling File Appender # log4j.appender.trafnsAppender=org.apache.log4j.RollingFileAppender -log4j.appender.trafnsAppender.file=${trafodion.log.dir}/ns${trafodion.log.filename.suffix} +log4j.appender.trafnsAppender.file=${trafodion.log.dir}/trafns${trafodion.log.filename.suffix} log4j.appender.trafnsAppender.maxFileSize=100000000 log4j.appender.trafnsAppender.maxBackupIndex=1 log4j.appender.trafnsAppender.addPid=false http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/export/include/common/evl_sqlog_eventnum.h ---------------------------------------------------------------------- diff --git a/core/sqf/export/include/common/evl_sqlog_eventnum.h b/core/sqf/export/include/common/evl_sqlog_eventnum.h index 8b331ff..e9186e1 100644 --- a/core/sqf/export/include/common/evl_sqlog_eventnum.h +++ b/core/sqf/export/include/common/evl_sqlog_eventnum.h @@ -214,6 +214,8 @@ #define MON_CLUSTER_INITSERVERSOCK_3 101014203 #define MON_CLUSTER_INITSERVERSOCK_4 101014204 #define MON_CLUSTER_INITSERVERSOCK_5 101014205 +#define MON_CLUSTER_INITSERVERSOCK_6 101014206 +#define MON_CLUSTER_INITSERVERSOCK_7 101014207 #define MON_CLUSTER_SOFTNODEDOWN_1 101014301 #define MON_CLUSTER_SOFTNODEDOWN_2 101014302 @@ -435,6 +437,10 @@ #define MON_NODE_ADDLNODES_4 101041204 #define MON_NODE_DELETENODE_1 101041301 #define MON_NODE_STARTNAMESERVER_1 101041401 +#define MON_NODE_GETPROCESSNS_1 101041501 +#define MON_NODE_GETPROCESSNS_2 101041502 +#define MON_NODE_GETPROCESSNS_3 101041503 +#define MON_NODE_GETPROCESSNS_4 101041504 /* Module: config.cxx = 05 */ @@ -687,6 +693,8 @@ #define MON_REQQUEUE_SNAPSHOT_11 101180411 #define MON_REQQUEUE_SNAPSHOT_12 101180412 #define MON_REQQUEUE_SNAPSHOT_13 101180413 +#define MON_REQQUEUE_SNAPSHOT_14 101180414 +#define MON_REQQUEUE_SNAPSHOT_15 101180415 #define MON_REQQUEUE_REVIVE_1 101180501 #define MON_REQQUEUE_REVIVE_2 101180502 #define MON_REQQUEUE_REVIVE_3 101180503 @@ -717,6 +725,16 @@ #define MON_REQ_NAMESERVER_DELETE_4 101181204 #define MON_REQ_NAMESERVER_INFO_1 101181301 +#define MON_REQ_EVALREQ_PERFORMANCE_1 101181501 +#define MON_INTREQ_CLONEPROC_1 101181601 +#define MON_INTREQ_EXIT_1 101181701 +#define MON_INTREQ_NEWPROC_1 101181801 +#define MON_INTREQ_NOTIFY_1 101181901 +#define MON_INTREQ_NOTIFY_2 101181902 +#define MON_INTREQ_NOTIFY_3 101181903 +#define MON_INTREQ_NOTIFY_4 101181904 +#define MON_INTREQ_OPEN_1 101182001 + /* Module: clio.cxx = 19 */ #define MON_CLIO_ACQUIRE_MSG_1 101190101 #define MON_CLIO_ACQUIRE_LOCK_1 101190201 @@ -975,6 +993,10 @@ /* Module: reqnodedown.cxx = 41 */ #define MON_EXT_NAMESERVERDOWN_REQ 101410101 +/* Module: nsreqnewproc.cxx = 42 */ +#define NS_EXTNEWPROCNSREQ_1 101420101 +#define NS_EXTNEWPROCNSREQ_2 101420102 + /* Module: tcdb.cxx = 90 */ #define TCDB_TCDB_1 101900101 #define TCDB_TCDB_2 101900102 @@ -994,6 +1016,8 @@ /* Module: tcdbsqlite.cxx = 92 */ #define SQLITE_DB_ACCESS_ERROR 101920101 +/* Module ptpclient.cxx = 93 */ +#define PTPCLIENT_PTPCLIENT_1 101930101 /**********************************************/ http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/export/include/trafconf/trafconfig.h ---------------------------------------------------------------------- diff --git a/core/sqf/export/include/trafconf/trafconfig.h b/core/sqf/export/include/trafconf/trafconfig.h index 65f5e3c..d68a8d0 100644 --- a/core/sqf/export/include/trafconf/trafconfig.h +++ b/core/sqf/export/include/trafconf/trafconfig.h @@ -50,7 +50,7 @@ #define TC_PERSIST_KEY_MAX 64 #define TC_PERSIST_VALUE_MAX 4096 #define TC_PERSIST_KEYS_VALUE_MAX 4096 -#define TC_NODES_MAX 256 +#define TC_NODES_MAX 1024 #define TC_SPARE_NODES_MAX 256 #define TC_UNIQUE_STRING_VALUE_MAX 4096 http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/monitor/linux/cluster.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/cluster.cxx b/core/sqf/monitor/linux/cluster.cxx index 480c509..101cec2 100644 --- a/core/sqf/monitor/linux/cluster.cxx +++ b/core/sqf/monitor/linux/cluster.cxx @@ -64,6 +64,9 @@ using namespace std; #include "zclient.h" #include "commaccept.h" #include "meas.h" +#ifdef NAMESERVER_PROCESS +#include "nscommacceptmon.h" +#endif extern bool IAmIntegrating; extern bool IAmIntegrated; @@ -79,8 +82,10 @@ extern char MyCommPort[MPI_MAX_PORT_NAME]; extern char MyMPICommPort[MPI_MAX_PORT_NAME]; extern char MySyncPort[MPI_MAX_PORT_NAME]; #ifdef NAMESERVER_PROCESS +extern CCommAcceptMon CommAcceptMon; extern char MyMon2NsPort[MPI_MAX_PORT_NAME]; #else +extern bool NameServerEnabled; extern char MyMon2MonPort[MPI_MAX_PORT_NAME]; #endif extern bool SMSIntegrating; @@ -250,6 +255,8 @@ void CCluster::ActivateSpare( CNode *spareNode, CNode *downNode, bool checkHealt lnode->PrepareForTransactions( downNode->GetPNid() != spareNode->GetPNid() ); } } +#else + ResetIntegratingPNid(); #endif } @@ -2714,8 +2721,18 @@ void CCluster::HandleMyNodeMsg (struct internal_msg_def *recv_msg, break; case InternalType_Clone: +#ifndef NAMESERVER_PROCESS if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) trace_printf("%s@%d - Internal clone request, completed replicating process (%d, %d) %s\n", method_name, __LINE__, recv_msg->u.clone.nid, recv_msg->u.clone.os_pid, (recv_msg->u.clone.backup?" Backup":"")); +#else + if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) + trace_printf("%s@%d - Internal clone request, process (%d, %d)" + " %s\n", method_name, __LINE__, + recv_msg->u.clone.nid, recv_msg->u.clone.os_pid, + (recv_msg->u.clone.backup?" Backup":"")); + + ReqQueue.enqueueCloneReq( &recv_msg->u.clone ); +#endif break; #ifndef NAMESERVER_PROCESS @@ -4692,6 +4709,20 @@ void CCluster::ResetIntegratingPNid( void ) integratingPNid_ = -1; +#ifdef NAMESERVER_PROCESS + if (!CommAcceptMon.isAccepting()) + { + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf( "%s@%d - Triggering commAcceptorMon thread to begin accepting connections\n", + method_name, __LINE__ ); + } + + // Indicate to the commAcceptor thread to begin accepting connections + CommAcceptMon.startAccepting(); + } +#endif + if (!CommAccept.isAccepting()) { if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) @@ -8325,7 +8356,11 @@ void CCluster::InitServerSock( void ) char ebuff[256]; char buf[MON_STRING_BUF_SIZE]; snprintf( buf, sizeof(buf) +#ifdef NAMESERVER_PROCESS + , "[%s@%d] MkSrvSock(NS_COMM_PORT=%d) error: %s\n" +#else , "[%s@%d] MkSrvSock(MONITOR_COMM_PORT=%d) error: %s\n" +#endif , method_name, __LINE__, serverCommPort , strerror_r( errno, ebuff, 256 ) ); mon_log_write( MON_CLUSTER_INITSERVERSOCK_2, SQ_LOG_CRIT, buf ); @@ -8373,7 +8408,11 @@ void CCluster::InitServerSock( void ) char ebuff[256]; char buf[MON_STRING_BUF_SIZE]; snprintf( buf, sizeof(buf) +#ifdef NAMESERVER_PROCESS + , "[%s@%d] MkSrvSock(NS_SYNC_PORT=%d) error: %s\n" +#else , "[%s@%d] MkSrvSock(MONITOR_SYNC_PORT=%d) error: %s\n" +#endif , method_name, __LINE__, serverSyncPort , strerror_r( errno, ebuff, 256 ) ); mon_log_write( MON_CLUSTER_INITSERVERSOCK_3, SQ_LOG_CRIT, buf ); @@ -8443,56 +8482,63 @@ void CCluster::InitServerSock( void ) } #else - env = getenv("MONITOR_COMM_PORT"); - if ( env ) - { - int val; - errno = 0; - val = strtol(env, NULL, 10); - if ( errno == 0) mon2monPort = val; - } - else - { - mon2monPort = 23399; - } - - // For virtual env, add PNid to the port so we can still test without collisions of port numbers - if (!IsRealCluster) + if (NameServerEnabled) { - mon2monPort += MyNode->GetPNid(); - } - mon2monSock_ = MkSrvSock( &mon2monPort ); - - - if ( mon2monSock_ < 0 ) - { - char ebuff[256]; - char buf[MON_STRING_BUF_SIZE]; - snprintf( buf, sizeof(buf) - , "[%s@%d] MkSrvSock(MON2MONSERVER_COMM_PORT=%d) error: %s\n" - , method_name, __LINE__, mon2monPort - , strerror_r( errno, ebuff, 256 ) ); - mon_log_write( MON_CLUSTER_INITSERVERSOCK_4, SQ_LOG_CRIT, buf ); - abort(); - } - else - { - snprintf( MyMon2MonPort, sizeof(MyMon2MonPort) - , "%d.%d.%d.%d:%d" - , (int)((unsigned char *)addr)[0] - , (int)((unsigned char *)addr)[1] - , (int)((unsigned char *)addr)[2] - , (int)((unsigned char *)addr)[3] - , mon2monPort ); - MyNode->SetMon2MonPort( MyMon2MonPort ); - - if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) - trace_printf( "%s@%d Initialized my mon2mon socket port, " - "pnid=%d (%s:%s) (mon2monPort=%s)\n" - , method_name, __LINE__ - , MyPNID, MyNode->GetName(), MyMon2MonPort - , MyNode->GetMon2MonPort() ); - + env = getenv("MON2MON_COMM_PORT"); + if ( env ) + { + int val; + errno = 0; + val = strtol(env, NULL, 10); + if ( errno == 0) mon2monPort = val; + } + else + { + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf) + , "[%s@%d] MON2MON_COMM_PORT environment variable is not set!\n" + , method_name, __LINE__ ); + mon_log_write( MON_CLUSTER_INITSERVERSOCK_5, SQ_LOG_CRIT, buf ); + abort(); + } + + // For virtual env, add PNid to the port so we can still test without collisions of port numbers + if (!IsRealCluster) + { + mon2monPort += MyNode->GetPNid(); + } + + mon2monSock_ = MkSrvSock( &mon2monPort ); + if ( mon2monSock_ < 0 ) + { + char ebuff[MON_STRING_BUF_SIZE]; + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf) + , "[%s@%d] MkSrvSock(MON2MON_COMM_PORT=%d) error: %s\n" + , method_name, __LINE__, mon2monPort + , strerror_r( errno, ebuff, MON_STRING_BUF_SIZE ) ); + mon_log_write( MON_CLUSTER_INITSERVERSOCK_6, SQ_LOG_CRIT, buf ); + abort(); + } + else + { + snprintf( MyMon2MonPort, sizeof(MyMon2MonPort) + , "%d.%d.%d.%d:%d" + , (int)((unsigned char *)addr)[0] + , (int)((unsigned char *)addr)[1] + , (int)((unsigned char *)addr)[2] + , (int)((unsigned char *)addr)[3] + , mon2monPort ); + MyNode->SetMon2MonPort( MyMon2MonPort ); + + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + trace_printf( "%s@%d Initialized my mon2mon socket port, " + "pnid=%d (%s:%s) (mon2monPort=%s)\n" + , method_name, __LINE__ + , MyPNID, MyNode->GetName(), MyMon2MonPort + , MyNode->GetMon2MonPort() ); + + } } #endif @@ -8503,7 +8549,7 @@ void CCluster::InitServerSock( void ) char buf[MON_STRING_BUF_SIZE]; snprintf( buf, sizeof(buf), "[%s@%d] epoll_create1() error: %s\n", method_name, __LINE__, strerror_r( errno, ebuff, 256 ) ); - mon_log_write( MON_CLUSTER_INITSERVERSOCK_4, SQ_LOG_CRIT, buf ); + mon_log_write( MON_CLUSTER_INITSERVERSOCK_7, SQ_LOG_CRIT, buf ); MPI_Abort( MPI_COMM_SELF,99 ); } http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/monitor/linux/internal.h ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/internal.h b/core/sqf/monitor/linux/internal.h index fab37da..505e1c3 100644 --- a/core/sqf/monitor/linux/internal.h +++ b/core/sqf/monitor/linux/internal.h @@ -284,12 +284,14 @@ struct node_name_def struct notify_def { - int nid; // Node id of process being notified - int pid; // Process id of process being notified - int canceled; // If true, notice request has been canceled - int target_nid; // Node id of process being monitored - int target_pid; // Process id of process being monitored - _TM_Txid_External trans_id; // Associated TransID + int nid; // Node id of process being notified + int pid; // Process id of process being notified + Verifier_t verifier; // Verifier of the requesting process + int canceled; // If true, notice request has been canceled + int target_nid; // Node id of process being monitored + int target_pid; // Process id of process being monitored + Verifier_t target_verifier; // Verifier of the requesting process + _TM_Txid_External trans_id; // Associated TransID }; @@ -417,6 +419,9 @@ struct uniqstr_def char valueData; // variable length string data }; +// Define a constant giving the "header" size of the internal_msg_def below +#define MSG_HDR_SIZE ( sizeof (InternalType) + sizeof (int) ) + struct internal_msg_def { InternalType type; @@ -450,13 +455,10 @@ struct internal_msg_def struct uniqstr_def uniqstr; struct shutdown_def shutdown; struct scheddata_def scheddata; + char buffer[MAX_SYNC_SIZE-MSG_HDR_SIZE]; // Limit entire buffer to MAX_SYNC_SIZE } u; } __attribute__((__may_alias__)); -// Define a constant giving the "header" size of the internal_msg_def above -#define MSG_HDR_SIZE ( sizeof (InternalType) + sizeof (int) ) - - typedef struct nodeId_s { char nodeName[MPI_MAX_PROCESSOR_NAME]; http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/monitor/linux/makefile ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/makefile b/core/sqf/monitor/linux/makefile index 0707d18..73127e4 100644 --- a/core/sqf/monitor/linux/makefile +++ b/core/sqf/monitor/linux/makefile @@ -266,6 +266,7 @@ NSOBJS += $(OUTDIR)/nsreqdelproc.o NSOBJS += $(OUTDIR)/nsreqstop.o NSOBJS += $(OUTDIR)/nsreqnewproc.o NSOBJS += $(OUTDIR)/nsreqprocinfo.o +NSOBJS += $(OUTDIR)/nsreqprocinfons.o NSOBJS += $(OUTDIR)/nsreqstart.o NSOBJS += $(OUTDIR)/nsreqshutdown.o ifeq ($(USE_FORK_SUSPEND_RESUME),1) http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/monitor/linux/mlio.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/mlio.cxx b/core/sqf/monitor/linux/mlio.cxx index 7b7158c..0f16ea0 100644 --- a/core/sqf/monitor/linux/mlio.cxx +++ b/core/sqf/monitor/linux/mlio.cxx @@ -104,6 +104,7 @@ const int SQ_LocalIOToClient::serviceRequestSize[] = { sizeof(REQTYPE) + sizeof( NodeDelete_def ), // ReqType_NodeDelete sizeof(REQTYPE) + sizeof( NodeDown_def ), // ReqType_NodeDown sizeof(REQTYPE) + sizeof( NodeInfo_def ), // ReqType_NodeInfo + sizeof(REQTYPE) + sizeof( NodeName_def ), // ReqType_NodeName sizeof(REQTYPE) + sizeof( NodeUp_def ), // ReqType_NodeUp 0, // ReqType_Notice -- not an actual request sizeof(REQTYPE) + sizeof( Notify_def ), // ReqType_Notify @@ -122,8 +123,7 @@ const int SQ_LocalIOToClient::serviceRequestSize[] = { sizeof(REQTYPE) + sizeof( TmReady_def ), // ReqType_TmReady sizeof(REQTYPE) + sizeof( TmSync_def ), // ReqType_TmSync sizeof(REQTYPE) + sizeof( TransInfo_def ), // ReqType_TransInfo - sizeof(REQTYPE) + sizeof( ZoneInfo_def ), // ReqType_ZoneInfo - sizeof(REQTYPE) + sizeof( NodeName_def ) // ReqType_NodeName + sizeof(REQTYPE) + sizeof( ZoneInfo_def ) // ReqType_ZoneInfo }; // The serviceReplySize array holds the size of the request messages for @@ -137,17 +137,19 @@ const int SQ_LocalIOToClient::serviceReplySize[] = { sizeof(REPLYTYPE) + sizeof( MonStats_reply_def ), // ReplyType_MonStats sizeof(REPLYTYPE) + sizeof( Mount_reply_def ), // ReplyType_Mount sizeof(REPLYTYPE) + sizeof( NewProcess_reply_def ), // ReplyType_NewProcess + sizeof(REPLYTYPE) + sizeof( NewProcessNs_reply_def ),// ReplyType_NewProcessNs sizeof(REPLYTYPE) + sizeof( NodeInfo_reply_def ), // ReplyType_NodeInfo + sizeof(REPLYTYPE) + sizeof( NodeName_reply_def ), // ReplyType_NodeName sizeof(REPLYTYPE) + sizeof( Open_reply_def ), // ReplyType_Open sizeof(REPLYTYPE) + sizeof( OpenInfo_reply_def ), // ReplyType_OpenInfo sizeof(REPLYTYPE) + sizeof( PNodeInfo_reply_def ), // ReplyType_PNodeInfo sizeof(REPLYTYPE) + sizeof( ProcessInfo_reply_def ), // ReplyType_ProcessInfo + sizeof(REPLYTYPE) + sizeof( ProcessInfoNs_reply_def ),// ReplyType_ProcessInfoNs sizeof(REPLYTYPE) + sizeof( Stfsd_reply_def ), // ReplyType_Stfsd sizeof(REPLYTYPE) + sizeof( Startup_reply_def ), // ReplyType_Startup sizeof(REPLYTYPE) + sizeof( TmSync_reply_def ), // ReplyType_TmSync sizeof(REPLYTYPE) + sizeof( TransInfo_reply_def ), // ReplyType_TransInfo - sizeof(REPLYTYPE) + sizeof( ZoneInfo_reply_def ), // ReplyType_ZoneInfo - sizeof(REPLYTYPE) + sizeof( NodeName_reply_def ) // ReplyType_NodeName + sizeof(REPLYTYPE) + sizeof( ZoneInfo_reply_def ) // ReplyType_ZoneInfo }; http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/monitor/linux/monitor.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/monitor.cxx b/core/sqf/monitor/linux/monitor.cxx index 1e623a4..4c67b15 100755 --- a/core/sqf/monitor/linux/monitor.cxx +++ b/core/sqf/monitor/linux/monitor.cxx @@ -565,9 +565,10 @@ bool CMonitor::CompleteProcessStartup (struct message_def * msg) msg->u.request.u.startup.process_name, msg->u.request.u.startup.port_name); } - +#ifndef NAMESERVER_PROCESS CProcessContainer::ParentNewProcReply( process, MPI_SUCCESS); status = SUCCESS; +#endif } else { @@ -1191,6 +1192,14 @@ int main (int argc, char *argv[]) #endif } +#ifndef NAMESERVER_PROCESS + env = getenv("SQ_NAMESERVER_ENABLE"); + if ( env && isdigit(*env) ) + { + NameServerEnabled = atoi(env); + } +#endif + if ( IsAgentMode || IsNameServer ) { MON_Props xprops( true ); @@ -1813,12 +1822,11 @@ int main (int argc, char *argv[]) Monitor = new CMonitor (procTermSig); #endif #ifndef NAMESERVER_PROCESS - env = getenv("SQ_NAMESERVER_ENABLED"); - if ( env && isdigit(*env) ) - NameServerEnabled = atoi(env); - - NameServer = new CNameServer (); - PtpClient = new CPtpClient (); + if (NameServerEnabled) + { + NameServer = new CNameServer (); + PtpClient = new CPtpClient (); + } #endif if ( IsAgentMode ) @@ -1937,9 +1945,18 @@ int main (int argc, char *argv[]) // Create thread to accept connections from other monitors CommAccept.start(); #ifdef NAMESERVER_PROCESS + // Create thread to accept connections from other name servers CommAcceptMon.start(); + if (IsMaster) + { + CommAcceptMon.startAccepting(); + } #else - PtpCommAccept.start(); + if (NameServerEnabled) + { + // Create thread to accept point-2-point connections from other monitors + PtpCommAccept.start(); + } #endif #ifndef NAMESERVER_PROCESS // Open file used to record process start/end times @@ -2284,7 +2301,10 @@ int main (int argc, char *argv[]) #ifndef NAMESERVER_PROCESS // Tell the LIO worker threads to exit SQ_theLocalIOToClient->shutdownWork(); - PtpCommAccept.shutdownWork(); + if (NameServerEnabled) + { + PtpCommAccept.shutdownWork(); + } #endif CommAccept.shutdownWork(); http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/monitor/linux/montrace.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/montrace.cxx b/core/sqf/monitor/linux/montrace.cxx index d294724..96eab05 100644 --- a/core/sqf/monitor/linux/montrace.cxx +++ b/core/sqf/monitor/linux/montrace.cxx @@ -61,7 +61,9 @@ const CMonTrace::TraceArea_t CMonTrace::traceAreaList_[] = {"MON_TRACE_REDIRECTION", TRACE_REDIRECTION}, {"MON_TRACE_TRAFCONFIG", TRACE_TRAFCONFIG}, {"MON_TRACE_HEALTH", TRACE_HEALTH}, - {"MON_TRACE_SIG_HANDLER", TRACE_SIG_HANDLER} + {"MON_TRACE_SIG_HANDLER", TRACE_SIG_HANDLER}, + {"MON_TRACE_NS", TRACE_NS}, + {"MON_TRACE_MEAS", TRACE_MEAS} }; // Global trace flags http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/monitor/linux/msgdef.h ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/msgdef.h b/core/sqf/monitor/linux/msgdef.h index b76862d..d974462 100644 --- a/core/sqf/monitor/linux/msgdef.h +++ b/core/sqf/monitor/linux/msgdef.h @@ -251,6 +251,7 @@ typedef enum { ReqType_PNodeInfo, // physical node information request ReqType_ProcessInfo, // process information request ReqType_ProcessInfoCont, // process information request (continuation) + ReqType_ProcessInfoNs, // process information request (monitor) ReqType_Set, // add configuration information to the registry ReqType_Shutdown, // request cluster shutdown ReqType_ShutdownNs, // request nameserver shutdown @@ -286,6 +287,7 @@ typedef enum { ReplyType_OpenInfo, // reply with list of opens for a process ReplyType_PNodeInfo, // reply with info on list of physical nodes ReplyType_ProcessInfo, // reply with info on list of processes + ReplyType_ProcessInfoNs, // reply with info of process ReplyType_Stfsd, // reply with stfsd info ReplyType_Startup, // reply with startup info ReplyType_TmSync, // reply from unsolicited TmSync message @@ -576,14 +578,16 @@ struct NewProcessNs_def bool event_messages; // true if want event messages bool system_messages; // true if want system messages long long tag; // user defined tag to be sent in completion notice - char path[MAX_SEARCH_PATH]; // process's object lookup path to program - char ldpath[MAX_SEARCH_PATH]; // process's library load path for program - char program[MAX_PROCESS_PATH]; // full path to object file + strId_t pathStrId; // program lookup path (string id) + strId_t ldpathStrId; // library load path (string id) + strId_t programStrId; // full path to object file (string id) char process_name[MAX_PROCESS_NAME]; // process name + char port_name[MPI_MAX_PORT_NAME]; // mpi port name from MPI_Open_port int argc; // number of additional command line argument char argv[MAX_ARGS][MAX_ARG_SIZE]; // array of additional command line arguments char infile[MAX_PROCESS_PATH]; // if null then use monitor's infile char outfile[MAX_PROCESS_PATH]; // if null then use monitor's outfile + struct timespec creation_time; // creation time int fill1; // filler to fill out struct }; @@ -958,6 +962,36 @@ struct ProcessInfo_reply_def bool more_data; // true if have additional process data }; +struct ProcessInfoNs_reply_def +{ + int nid; // node id + int pid; // process id + Verifier_t verifier; // process verifier + char process_name[MAX_PROCESS_NAME]; // process name + PROCESSTYPE type; // Identifies the process handling catagory + int parent_nid; // parent's node id + int parent_pid; // parent's process id + Verifier_t parent_verifier; // parent's process verifier + int priority; // Linux system priority + int backup; // if non-zero, starts process as backup + STATE state; // process's current state + bool unhooked; // if hooked, parent process dies will trigger child process exits + bool event_messages; // true if want event messages + bool system_messages; // true if want system messages + long long tag; // user defined tag to be sent in completion notice + char program[MAX_PROCESS_PATH]; // process's object file name + strId_t pathStrId; // program lookup path (string id) + strId_t ldpathStrId; // library load path (string id) + strId_t programStrId; // full path to object file (string id) + char port_name[MPI_MAX_PORT_NAME]; // mpi port name from MPI_Open_port + int argc; // number of additional command line argument + char argv[MAX_ARGS][MAX_ARG_SIZE]; // array of additional command line arguments + char infile[MAX_PROCESS_PATH]; // if null then use monitor's infile + char outfile[MAX_PROCESS_PATH]; // if null then use monitor's outfile + struct timespec creation_time; // creation time + int return_code; // mpi error code of error +}; + struct ProcessInfoCont_def { int nid; // requesting process's node id @@ -1233,6 +1267,7 @@ struct reply_def struct OpenInfo_reply_def open_info; struct PNodeInfo_reply_def pnode_info; struct ProcessInfo_reply_def process_info; + struct ProcessInfoNs_reply_def process_info_ns; struct Startup_reply_def startup_info; #ifdef SQ_STFSD struct Stfsd_reply_def stfsd; http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/monitor/linux/nameserver.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/nameserver.cxx b/core/sqf/monitor/linux/nameserver.cxx index 1e32972..b201ae6 100644 --- a/core/sqf/monitor/linux/nameserver.cxx +++ b/core/sqf/monitor/linux/nameserver.cxx @@ -534,6 +534,17 @@ int CNameServer::ProcessInfoCont( struct message_def* msg ) return error; } +int CNameServer::ProcessInfoNs( struct message_def* msg ) +{ + const char method_name[] = "CNameServer::ProcessInfoNs"; + TRACE_ENTRY; + + int error = SendReceive( msg ); + + TRACE_EXIT; + return error; +} + int CNameServer::ProcessNew(CProcess* process ) { const char method_name[] = "CNameServer::ProcessNew"; @@ -564,10 +575,20 @@ int CNameServer::ProcessNew(CProcess* process ) msgnew->verifier = process->GetVerifier(); msgnew->type = process->GetType(); msgnew->priority = process->GetPriority(); + msgnew->backup = process->IsBackup(); + msgnew->unhooked = process->IsUnhooked(); msgnew->event_messages = process->IsEventMessages(); msgnew->system_messages = process->IsSystemMessages(); + msgnew->pathStrId = process->pathStrId(); + msgnew->ldpathStrId = process->ldPathStrId(); + msgnew->programStrId = process->programStrId(); strcpy( msgnew->process_name, process->GetName() ); - strcpy( msgnew->program, process->program() ); + strcpy( msgnew->port_name, process->GetPort() ); + msgnew->argc = process->argc(); + memcpy(msgnew->argv, process->userArgv(), process->userArgvLen()); + strcpy( msgnew->infile, process->infile() ); + strcpy( msgnew->outfile, process->outfile() ); + msgnew->creation_time = process->GetCreationTime(); int error = SendReceive(&msg ); @@ -650,6 +671,10 @@ int CNameServer::SendReceive( struct message_def* msg ) descp = (char *) "process-info-cont"; size += sizeof(msg->u.request.u.process_info_cont); break; + case ReqType_ProcessInfoNs: + descp = (char *) "process-info-ns"; + size += sizeof(msg->u.request.u.process_info); + break; case ReqType_ShutdownNs: msgshutdown = &msg->u.request.u.shutdown_ns; sprintf( desc, "shutdown (nid=%d, pid=%d, level=%d)", @@ -670,7 +695,7 @@ int CNameServer::SendReceive( struct message_def* msg ) { if ( trace_settings & ( TRACE_NS | TRACE_PROCESS ) ) { - char desc[200]; + char desc[2048]; char* descp = desc; switch ( msg->u.reply.type ) { @@ -704,6 +729,70 @@ int CNameServer::SendReceive( struct message_def* msg ) msg->u.reply.u.process_info.return_code, msg->u.reply.u.process_info.more_data ); break; + case ReplyType_ProcessInfoNs: +// int argvLen = sizeof(msg->u.reply.u.process_info_ns.argv); + sprintf( desc, + "process-info-ns reply:\n" + " process_info_ns.nid=%d\n" + " process_info_ns.pid=%d\n" + " process_info_ns.verifier=%d\n" + " process_info_ns.process_name=%s\n" + " process_info_ns.type=%d\n" + " process_info_ns.parent_nid=%d\n" + " process_info_ns.parent_pid=%d\n" + " process_info_ns.parent_verifier=%d\n" + " process_info_ns.priority=%d\n" + " process_info_ns.backup=%d\n" + " process_info_ns.state=%d\n" + " process_info_ns.unhooked=%d\n" + " process_info_ns.event_messages=%d\n" + " process_info_ns.system_messages=%d\n" + " process_info_ns.program=%s\n" + " process_info_ns.pathStrId=%d:%d\n" + " process_info_ns.ldpathStrId=%d:%d\n" + " process_info_ns.programStrId=%d:%d\n" + " process_info_ns.port_name=%s\n" + " process_info_ns.argc=%d\n" +// " process_info_ns.argv=[%.*s]\n" + " process_info_ns.infile=%s\n" + " process_info_ns.outfile=%s\n" +//#if 0 +// " process_info_ns.creation_time=%ld(secs)\n", +// " process_info_ns.creation_time=%ld(secs):%ld(nsecs)\n", +//#endif + " process_info_ns.return_code=%d" + , msg->u.reply.u.process_info_ns.nid + , msg->u.reply.u.process_info_ns.pid + , msg->u.reply.u.process_info_ns.verifier + , msg->u.reply.u.process_info_ns.process_name + , msg->u.reply.u.process_info_ns.type + , msg->u.reply.u.process_info_ns.parent_nid + , msg->u.reply.u.process_info_ns.parent_pid + , msg->u.reply.u.process_info_ns.parent_verifier + , msg->u.reply.u.process_info_ns.priority + , msg->u.reply.u.process_info_ns.backup + , msg->u.reply.u.process_info_ns.state + , msg->u.reply.u.process_info_ns.unhooked + , msg->u.reply.u.process_info_ns.event_messages + , msg->u.reply.u.process_info_ns.system_messages + , msg->u.reply.u.process_info_ns.program + , msg->u.reply.u.process_info_ns.pathStrId.nid + , msg->u.reply.u.process_info_ns.pathStrId.id + , msg->u.reply.u.process_info_ns.ldpathStrId.nid + , msg->u.reply.u.process_info_ns.ldpathStrId.id + , msg->u.reply.u.process_info_ns.programStrId.nid + , msg->u.reply.u.process_info_ns.programStrId.id + , msg->u.reply.u.process_info_ns.port_name + , msg->u.reply.u.process_info_ns.argc +// , &msg->u.reply.u.process_info_ns.argv + , msg->u.reply.u.process_info_ns.infile + , msg->u.reply.u.process_info_ns.outfile +//#if 0 +// , msg->u.reply.u.process_info_ns.creation_time.tv_sec +// , msg->u.reply.u.process_info_ns.creation_time.tv_nsec +//#endif + , msg->u.reply.u.process_info_ns.return_code ); + break; default: descp = (char *) "UNKNOWN"; break; http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/monitor/linux/nameserver.h ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/nameserver.h b/core/sqf/monitor/linux/nameserver.h index e3ee6eb..a8ccb4b 100644 --- a/core/sqf/monitor/linux/nameserver.h +++ b/core/sqf/monitor/linux/nameserver.h @@ -44,6 +44,7 @@ public: int ProcessDelete(CProcess* process ); int ProcessInfo( struct message_def* msg ); int ProcessInfoCont( struct message_def* msg ); + int ProcessInfoNs( struct message_def* msg ); int ProcessNew(CProcess* process ); int ProcessShutdown( void ); void SetLocalHost( void ); http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/monitor/linux/notice.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/notice.cxx b/core/sqf/monitor/linux/notice.cxx index 4695210..85a9eab 100644 --- a/core/sqf/monitor/linux/notice.cxx +++ b/core/sqf/monitor/linux/notice.cxx @@ -38,11 +38,17 @@ using namespace std; #include "pnode.h" #include "notice.h" #include "mlio.h" - #include "replicate.h" +#include "ptpclient.h" +#include "nameserver.h" extern int trace_level; extern CMonitor *Monitor; +extern int NameServerEnabled; +extern CNameServer *NameServer; +extern CPtpClient *PtpClient; +extern CNode *MyNode; + extern CNodeContainer *Nodes; extern CMonStats *MonStats; extern int MyPNID; @@ -283,10 +289,10 @@ void CNotice::Notify( SQ_LocalIOToClient::bcastPids_t *bcastPids ) trace_printf( "%s@%d - Process %s (%d, %d:%d)" " doesn't want Death message" "\n" , method_name, __LINE__ - , Process->GetName() - , Process->GetNid() - , Process->GetPid() - , Process->GetVerifier() ); + , notify->GetName() + , notify->GetNid() + , notify->GetPid() + , notify->GetVerifier() ); } } else @@ -294,10 +300,10 @@ void CNotice::Notify( SQ_LocalIOToClient::bcastPids_t *bcastPids ) if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) trace_printf( "%s@%d - Not processed for clone Process %s (%d, %d:%d)\n" , method_name, __LINE__ - , Process->GetName() - , Process->GetNid() - , Process->GetPid() - , Process->GetVerifier() ); + , notify->GetName() + , notify->GetNid() + , notify->GetPid() + , notify->GetVerifier() ); } } else @@ -373,3 +379,134 @@ void CNotice::NotifyAll( void ) TRACE_EXIT; } +void CNotice::NotifyRemote( void ) +{ + const char method_name[] = "CNotice::NotifyRemote"; + TRACE_ENTRY; + + int targetNid = -1; + CNotice *entry = NULL; + NidQueue_t *nidQueue = new NidQueue_t; + + // build the nid queue of nodes to notify + for( entry=this; entry; entry=entry->Next ) + { + entry->NotifyNid( nidQueue ); + } + + while ( !nidQueue->empty() ) + { + targetNid = nidQueue->front(); + CLNode *targetLNode = Nodes->GetLNode( targetNid ); + + int rc = -1; + // Send the process exit to the target node + rc = PtpClient->ProcessExit( Process + , targetNid + , targetLNode->GetNode()->GetName() ); + if (rc) + { + // TODO: Error handling + } + nidQueue->pop(); + } + + delete nidQueue; + + TRACE_EXIT; +} + +void CNotice::NotifyNid( NidQueue_t *nidQueue ) +{ + const char method_name[] = "CNotice::NotifyNid"; + TRACE_ENTRY; + + CProcess *remoteProcess; + + if ( canceled_ ) + { + if (trace_settings & (TRACE_REQUEST_DETAIL | TRACE_PROCESS_DETAIL)) + trace_printf( "%s@%d - Process death notice for process %s (%d, %d:%d) " + "was canceled so not delivered to process %s (%d, %d:%d)\n" + , method_name, __LINE__ + , Process->GetName() + , Process->GetNid() + , Process->GetPid() + , Process->GetVerifier() + , name_.c_str() + , Nid + , Pid + , verifier_); + } + else + { + if ( !MyNode->IsMyNode(Nid) ) + { + // find by nid (check node state, check process state, backup is Ok) + remoteProcess = Nodes->GetProcess( Nid + , Pid + , verifier_ + , true, true, true ); + if( remoteProcess ) + { + if (remoteProcess->IsSystemMessages() ) + { + // Add this process' nid to the queue + nidQueue->push( Nid); + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + CLNode *lnode = Nodes->GetLNode( Nid ); + trace_printf( "%s@%d - Sending process %s (%d, %d:%d) " + "exit message to %s (nid=%d)\n" + , method_name, __LINE__ + , Process->GetName() + , Process->GetNid() + , Process->GetPid() + , Process->GetVerifier() + , lnode?lnode->GetNode()->GetName():"" + , lnode?lnode->GetNode()->GetPNid():Nid); + } + } + else + { + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d - Process %s (%d, %d:%d)" + " doesn't want Death message\n" + , method_name, __LINE__ + , remoteProcess->GetName() + , remoteProcess->GetNid() + , remoteProcess->GetPid() + , remoteProcess->GetVerifier() ); + } + } + } + else + { + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d - Can't find process %s (%d, %d:%d)\n" + , method_name, __LINE__ + , name_.c_str() + , Nid + , Pid + , verifier_ ); + } + } + } + else + { + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d - Not processed for local Process %s (%d, %d:%d)\n" + , method_name, __LINE__ + , Process->GetName() + , Process->GetNid() + , Process->GetPid() + , Process->GetVerifier() ); + } + } + } + TRACE_EXIT; +} + http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/monitor/linux/notice.h ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/notice.h b/core/sqf/monitor/linux/notice.h index c3888a0..c2ad3ff 100644 --- a/core/sqf/monitor/linux/notice.h +++ b/core/sqf/monitor/linux/notice.h @@ -27,8 +27,11 @@ #define NOTICE_H_ #ifndef NAMESERVER_PROCESS +#include <queue> #include "mlio.h" +typedef queue<int> NidQueue_t; // (nid) + class CProcess; class CNotice @@ -60,11 +63,13 @@ public: , _TM_Txid_External trans_id ); CNotice *Link( CNotice *entry ); void NotifyAll( void ); + void NotifyRemote( void ); bool isCanceled ( void ) { return canceled_; } protected: private: void Notify( SQ_LocalIOToClient::bcastPids_t *bcastPids ); + void NotifyNid( NidQueue_t *nidQueue ); bool canceled_; // true if notice request has been canceled CProcess *Process; // process that is being monitored CNotice *Next; http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/monitor/linux/nscommacceptmon.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/nscommacceptmon.cxx b/core/sqf/monitor/linux/nscommacceptmon.cxx index 39213f7..633eb2d 100644 --- a/core/sqf/monitor/linux/nscommacceptmon.cxx +++ b/core/sqf/monitor/linux/nscommacceptmon.cxx @@ -50,7 +50,7 @@ extern CReqQueue ReqQueue; static void *mon2nsProcess(void *arg); CCommAcceptMon::CCommAcceptMon() - : accepting_(true) + : accepting_(false) , shutdown_(false) , thread_id_(0) { @@ -242,6 +242,46 @@ void CCommAcceptMon::monReqProcessInfoCont( struct message_def* msg, int sockFd TRACE_EXIT; } +void CCommAcceptMon::monReqProcessInfoNs( struct message_def* msg, int sockFd ) +{ + const char method_name[] = "CCommAcceptMon::monReqProcessInfoNs"; + TRACE_ENTRY; + + if ( trace_settings & ( TRACE_NS | TRACE_REQUEST) ) + { + trace_printf( "%s@%d - Received monitor request process-info-ns data.\n" + " msg.info.nid=%d\n" + " msg.info.pid=%d\n" + " msg.info.verifier=%d\n" + " msg.info.target_nid=%d\n" + " msg.info.target_pid=%d\n" + " msg.info.target_verifier=%d\n" + " msg.info.target_process_name=%s\n" + " msg.info.target_process_pattern=%s\n" + " msg.info.type=%d\n" + , method_name, __LINE__ + , msg->u.request.u.process_info.nid + , msg->u.request.u.process_info.pid + , msg->u.request.u.process_info.verifier + , msg->u.request.u.process_info.target_nid + , msg->u.request.u.process_info.target_pid + , msg->u.request.u.process_info.target_verifier + , msg->u.request.u.process_info.target_process_name + , msg->u.request.u.process_info.target_process_pattern + , msg->u.request.u.process_info.type + ); + } + + CExternalReq::reqQueueMsg_t msgType; + msgType = CExternalReq::NonStartupMsg; + int nid = msg->u.request.u.process_info.nid; + int pid = msg->u.request.u.process_info.pid; + // Place new request on request queue + ReqQueue.enqueueReq(msgType, nid, pid, sockFd, msg); + + TRACE_EXIT; +} + void CCommAcceptMon::monReqNewProcess( struct message_def* msg, int sockFd ) { const char method_name[] = "CCommAcceptMon::monReqNewProcess"; @@ -249,25 +289,55 @@ void CCommAcceptMon::monReqNewProcess( struct message_def* msg, int sockFd ) if ( trace_settings & ( TRACE_NS | TRACE_REQUEST) ) { trace_printf( "%s@%d - Received monitor request new-process data.\n" - " msg.new_process_ns.parent_nid=%d\n" - " msg.new_process_ns.parent_pid=%d\n" - " msg.new_process_ns.parent_verifier=%d\n" + " msg.new_process_ns.process_name=%s\n" " msg.new_process_ns.nid=%d\n" " msg.new_process_ns.pid=%d\n" " msg.new_process_ns.verifier=%d\n" " msg.new_process_ns.type=%d\n" + " msg.new_process_ns.parent_nid=%d\n" + " msg.new_process_ns.parent_pid=%d\n" + " msg.new_process_ns.parent_verifier=%d\n" " msg.new_process_ns.priority=%d\n" - " msg.new_process_ns.process_name=%s\n" + " msg.new_process_ns.backup=%d\n" + " msg.new_process_ns.unhooked=%d\n" + " msg.new_process_ns.event_messages=%d\n" + " msg.new_process_ns.system_messages=%d\n" + " msg.new_process_ns.pathStrId=%d:%d\n" + " msg.new_process_ns.ldpathStrId=%d:%d\n" + " msg.new_process_ns.programStrId=%d:%d\n" + " msg.new_process_ns.port=%s\n" + " msg.new_process_ns.argc=%d\n" + //" msg.new_process_ns.argv=%s\n" + " msg.new_process_ns.infile=%s\n" + " msg.new_process_ns.outfile=%s\n" + " msg.new_process_ns.creation_time=%ld(secs):%ld(nsecs)\n" , method_name, __LINE__ - , msg->u.request.u.new_process_ns.parent_nid - , msg->u.request.u.new_process_ns.parent_pid - , msg->u.request.u.new_process_ns.parent_verifier + , msg->u.request.u.new_process_ns.process_name , msg->u.request.u.new_process_ns.nid , msg->u.request.u.new_process_ns.pid , msg->u.request.u.new_process_ns.verifier , msg->u.request.u.new_process_ns.type + , msg->u.request.u.new_process_ns.parent_nid + , msg->u.request.u.new_process_ns.parent_pid + , msg->u.request.u.new_process_ns.parent_verifier , msg->u.request.u.new_process_ns.priority - , msg->u.request.u.new_process_ns.process_name + , msg->u.request.u.new_process_ns.backup + , msg->u.request.u.new_process_ns.unhooked + , msg->u.request.u.new_process_ns.event_messages + , msg->u.request.u.new_process_ns.system_messages + , msg->u.request.u.new_process_ns.pathStrId.nid + , msg->u.request.u.new_process_ns.pathStrId.id + , msg->u.request.u.new_process_ns.ldpathStrId.nid + , msg->u.request.u.new_process_ns.ldpathStrId.id + , msg->u.request.u.new_process_ns.programStrId.nid + , msg->u.request.u.new_process_ns.programStrId.id + , msg->u.request.u.new_process_ns.port_name + , msg->u.request.u.new_process_ns.argc + //, msg->u.request.u.new_process_ns.argv + , msg->u.request.u.new_process_ns.infile + , msg->u.request.u.new_process_ns.outfile + , msg->u.request.u.new_process_ns.creation_time.tv_sec + , msg->u.request.u.new_process_ns.creation_time.tv_nsec ); } @@ -469,6 +539,9 @@ void CCommAcceptMon::processMonReqs( int sockFd ) case ReqType_ProcessInfoCont: rtype = "ProcessInfoCont"; break; + case ReqType_ProcessInfoNs: + rtype = "ProcessInfoNs"; + break; case ReqType_NameServerStop: rtype = "NameServerStop"; break; @@ -515,6 +588,10 @@ void CCommAcceptMon::processMonReqs( int sockFd ) monReqProcessInfoCont(&msg, sockFd); break; + case ReqType_ProcessInfoNs: + monReqProcessInfoNs(&msg, sockFd); + break; + case ReqType_NewProcessNs: monReqNewProcess(&msg, sockFd); break; http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/monitor/linux/nscommacceptmon.h ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/nscommacceptmon.h b/core/sqf/monitor/linux/nscommacceptmon.h index 6caf699..30daa92 100644 --- a/core/sqf/monitor/linux/nscommacceptmon.h +++ b/core/sqf/monitor/linux/nscommacceptmon.h @@ -48,6 +48,7 @@ public: void monReqNewProcess( struct message_def* msg, int sockFd ); void monReqProcessInfo( struct message_def* msg, int sockFd ); void monReqProcessInfoCont( struct message_def* msg, int sockFd ); + void monReqProcessInfoNs( struct message_def* msg, int sockFd ); void monReqShutdown( struct message_def* msg, int sockFd ); void monReqUnknown( struct message_def* msg, int sockFd ); void startAccepting( void ); http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/monitor/linux/nsprocess.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/nsprocess.cxx b/core/sqf/monitor/linux/nsprocess.cxx index bbcb853..62523ae 100644 --- a/core/sqf/monitor/linux/nsprocess.cxx +++ b/core/sqf/monitor/linux/nsprocess.cxx @@ -95,20 +95,14 @@ void CProcess::CompleteProcessStartup (char *port, int os_pid, bool event_messag if (trace_settings & (TRACE_SYNC_DETAIL | TRACE_PROCESS_DETAIL | TRACE_REQUEST_DETAIL)) trace_printf("%s@%d: process %s (%d, %d), preclone=%d" - ", clone=%d\n", + ", clone=%d, origPNidNs=%d, MyPNID=%d\n", method_name, __LINE__, Name, - Nid, os_pid, preclone, Clone); + Nid, os_pid, preclone, Clone, origPNidNs, MyPNID ); StartupCompleted = true; if (creation_time != NULL) CreationTime = *creation_time; - if ( MyPNID == GetOrigPNidNs() ) - { - // Replicate to other nodes - CReplClone *repl = new CReplClone(this); - Replicator.addItem(repl); - } - else + if ( MyPNID != GetOrigPNidNs() ) { Clone = true; } http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/monitor/linux/nsreqnewproc.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/nsreqnewproc.cxx b/core/sqf/monitor/linux/nsreqnewproc.cxx index 105ff95..92a7634 100644 --- a/core/sqf/monitor/linux/nsreqnewproc.cxx +++ b/core/sqf/monitor/linux/nsreqnewproc.cxx @@ -26,11 +26,14 @@ #include "nstype.h" #include <stdio.h> +#include "replicate.h" #include "reqqueue.h" #include "montrace.h" #include "monsonar.h" +extern int MyPNID; extern CNodeContainer *Nodes; +extern CReplicate Replicator; extern const char *ProcessTypeString( PROCESSTYPE type ); @@ -76,16 +79,13 @@ void CExtNewProcNsReq::performRequest() CNode *node; CLNode *parent_lnode; CNode *parent_node; - int result; + int result = MPI_SUCCESS; lnode = Nodes->GetLNode( nid_ ); node = lnode->GetNode(); parent_lnode = Nodes->GetLNode( msg_->u.request.u.new_process_ns.parent_nid ); parent_node = NULL; if ( parent_lnode ) parent_node = parent_lnode->GetNode(); - strId_t pathStrId = node->GetStringId ( msg_->u.request.u.new_process_ns.path ); - strId_t ldpathStrId = node->GetStringId (msg_->u.request.u.new_process_ns.ldpath ); - strId_t programStrId = node->GetStringId ( msg_->u.request.u.new_process_ns.program ); CProcess *parent = NULL; if ( parent_node ) parent = parent_node->GetProcess( msg_->u.request.u.new_process_ns.parent_pid ); @@ -101,25 +101,56 @@ void CExtNewProcNsReq::performRequest() msg_->u.request.u.new_process_ns.backup, msg_->u.request.u.new_process_ns.unhooked, msg_->u.request.u.new_process_ns.process_name, - pathStrId, - ldpathStrId, - programStrId, + msg_->u.request.u.new_process_ns.pathStrId, + msg_->u.request.u.new_process_ns.ldpathStrId, + msg_->u.request.u.new_process_ns.programStrId, msg_->u.request.u.new_process_ns.infile, msg_->u.request.u.new_process_ns.outfile, result ); - process = process; // touch - // TODO replicate + if (process) + { + if ( MyPNID == process->GetOrigPNidNs() ) + { + process->userArgs ( msg_->u.request.u.new_process_ns.argc, + msg_->u.request.u.new_process_ns.argv ); - msg_->u.reply.type = ReplyType_NewProcessNs; - msg_->u.reply.u.new_process_ns.nid = msg_->u.request.u.new_process_ns.nid; - msg_->u.reply.u.new_process_ns.pid = msg_->u.request.u.new_process_ns.pid; - msg_->u.reply.u.new_process_ns.verifier = msg_->u.request.u.new_process_ns.verifier; - strncpy(msg_->u.reply.u.new_process_ns.process_name, msg_->u.request.u.new_process_ns.process_name, MAX_PROCESS_NAME); - msg_->u.reply.u.new_process_ns.return_code = MPI_SUCCESS; + process->CompleteProcessStartup( msg_->u.request.u.new_process_ns.port_name + , msg_->u.request.u.new_process_ns.pid + , msg_->u.request.u.new_process_ns.event_messages + , msg_->u.request.u.new_process_ns.system_messages + , false + , &msg_->u.request.u.new_process_ns.creation_time + , process->GetOrigPNidNs() ); - // Send reply to requester - monreply(msg_, sockFd_); + // Replicate to other nodes + CReplClone *repl = new CReplClone(process); + if (repl) + { + // we will not reply at this time ... but wait for + // node add to be processed in CIntNodeAddReq + + // Retain reference to requester's request buffer so can + // send completion message. + process->SetMonContext( msg_ ); + process->SetMonSockFd( sockFd_ ); + msg_->noreply = true; + + Replicator.addItem(repl); + } + } + } + else + { + char la_buf[MON_STRING_BUF_SIZE]; + msg_->u.reply.type = ReplyType_NewProcessNs; + msg_->u.reply.u.new_process_ns.return_code = MPI_ERR_SPAWN; + sprintf(la_buf, "[%s], Unsuccessful!\n", method_name); + mon_log_write(NS_EXTNEWPROCNSREQ_1, SQ_LOG_CRIT, la_buf); + + // Send reply to requester + monreply(msg_, sockFd_); + } TRACE_EXIT; } http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/monitor/linux/nsreqprocinfons.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/nsreqprocinfons.cxx b/core/sqf/monitor/linux/nsreqprocinfons.cxx new file mode 100644 index 0000000..3be8ddb --- /dev/null +++ b/core/sqf/monitor/linux/nsreqprocinfons.cxx @@ -0,0 +1,206 @@ +/////////////////////////////////////////////////////////////////////////////// +// +// @@@ START COPYRIGHT @@@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// @@@ END COPYRIGHT @@@ +// +/////////////////////////////////////////////////////////////////////////////// + +#include "nstype.h" + +#include <stdio.h> +#include "reqqueue.h" +#include "montrace.h" +#include "monsonar.h" +#include "monlogging.h" +#include "replicate.h" +#include "mlio.h" + +extern CMonitor *Monitor; +extern CMonStats *MonStats; +extern CNodeContainer *Nodes; +extern CReplicate Replicator; +extern int MyPNID; + +CExtProcInfoNsReq::CExtProcInfoNsReq( reqQueueMsg_t msgType, + int nid, int pid, int sockFd, + struct message_def *msg ) + : CExternalReq(msgType, nid, pid, sockFd, msg) +{ + // Add eyecatcher sequence as a debugging aid + memcpy(&eyecatcher_, "RqEB", 4); +} + +CExtProcInfoNsReq::~CExtProcInfoNsReq() +{ + // Alter eyecatcher sequence as a debugging aid to identify deleted object + memcpy(&eyecatcher_, "rQeb", 4); +} + +// Copy information for a specific process into the reply message buffer. +void CExtProcInfoNsReq::copyInfo(CProcess *process, ProcessInfoNs_reply_def &processInfo) +{ + CProcess *parent; + + processInfo.nid = process->GetNid(); + processInfo.pid = process->GetPid(); + processInfo.verifier = process->GetVerifier(); + strncpy( processInfo.process_name, process->GetName(), MAX_PROCESS_NAME ); + processInfo.type = process->GetType(); + + parent = process->GetParent(); + if (parent) + { + processInfo.parent_nid = parent->GetNid(); + processInfo.parent_pid = parent->GetPid(); + processInfo.parent_verifier = parent->GetVerifier(); +// strncpy(processInfo.parent_name, parent->GetName(), MAX_PROCESS_NAME ); + } + else + { + processInfo.parent_nid = -1; + processInfo.parent_pid = -1; + processInfo.parent_verifier = -1; +// processInfo.parent_name[0] = '\0'; + } + + processInfo.priority = process->GetPriority(); + processInfo.backup = process->IsBackup(); + processInfo.state = process->GetState(); + processInfo.unhooked = process->IsUnhooked(); + processInfo.event_messages = process->IsEventMessages(); + processInfo.system_messages = process->IsSystemMessages(); + strncpy( processInfo.program, process->program(), MAX_PROCESS_PATH ); + processInfo.pathStrId = process->pathStrId(); + processInfo.ldpathStrId = process->ldPathStrId(); + processInfo.programStrId = process->programStrId(); + strncpy( processInfo.port_name, process->GetPort(), MPI_MAX_PORT_NAME ); + processInfo.argc = process->argc(); + memcpy( processInfo.argv, process->userArgv(), process->userArgvLen() ); + strncpy( processInfo.infile, process->infile(), MAX_PROCESS_PATH ); + strncpy( processInfo.outfile, process->outfile(), MAX_PROCESS_PATH ); + processInfo.creation_time = process->GetCreationTime(); +} + +void CExtProcInfoNsReq::populateRequestString( void ) +{ + char strBuf[MON_STRING_BUF_SIZE/2] = { 0 }; + + snprintf( strBuf, sizeof(strBuf), + "ExtReq(%s) req #=%ld " + "requester(name=%s/nid=%d/pid=%d/os_pid=%d/verifier=%d) " + "target(name=%s/nid=%d/pid=%d/verifier=%d) pattern(name=%s)" + , CReqQueue::svcReqType[reqType_], getId() + , msg_->u.request.u.process_info.process_name + , msg_->u.request.u.process_info.nid + , msg_->u.request.u.process_info.pid + , pid_ + , msg_->u.request.u.process_info.verifier + , msg_->u.request.u.process_info.target_process_name + , msg_->u.request.u.process_info.target_nid + , msg_->u.request.u.process_info.target_pid + , msg_->u.request.u.process_info.target_verifier + , msg_->u.request.u.process_info.target_process_pattern ); + requestString_.assign( strBuf ); +} + +void CExtProcInfoNsReq::performRequest() +{ + CProcess *process = NULL; + + const char method_name[] = "CExtProcInfoNsReq::performRequest"; + TRACE_ENTRY; + +#if 0 // TODO + // Record statistics (sonar counters) + if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED)) + MonStats->req_type_kill_Incr(); +#endif + + nid_ = msg_->u.request.u.process_info.nid; + verifier_ = msg_->u.request.u.process_info.verifier; + processName_ = msg_->u.request.u.process_info.process_name; + + int target_nid = -1; + int target_pid = -1; + string target_process_name; + Verifier_t target_verifier = -1; + + target_nid = msg_->u.request.u.process_info.target_nid; + target_pid = msg_->u.request.u.process_info.target_pid; + target_process_name = (const char *) msg_->u.request.u.process_info.target_process_name; + target_verifier = msg_->u.request.u.process_info.target_verifier; + + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d request #%ld: ProcessInfoNs, for (%d, %d:%d), " + "process type=%d\n" + , method_name, __LINE__, id_ + , target_nid, target_pid, target_verifier + , msg_->u.request.u.process_info.type); + } + + if ( target_process_name.size() ) + { // find by name (don't check node state, don't check process state, not backup) + process = Nodes->GetProcess( target_process_name.c_str() + , target_verifier + , false, false, false ); + } + else + { // find by nid (don't check node state, don't check process state, backup is Ok) + process = Nodes->GetProcess( target_nid + , target_pid + , target_verifier + , false, false, true ); + } + + + if (process) + { + msg_->u.reply.type = ReplyType_ProcessInfoNs; + msg_->u.reply.u.process_info_ns.return_code = MPI_SUCCESS; + copyInfo( process, msg_->u.reply.u.process_info_ns ); + } + else + { + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + { + trace_printf( "%s@%d - Kill %s (%d, %d:%d) -- can't find target process\n" + , method_name, __LINE__ + , msg_->u.request.u.process_info.target_process_name + , msg_->u.request.u.process_info.target_nid + , msg_->u.request.u.process_info.target_pid + , msg_->u.request.u.process_info.target_verifier); + } + msg_->u.reply.type = ReplyType_ProcessInfoNs; + msg_->u.reply.u.process_info_ns.nid = target_nid; + msg_->u.reply.u.process_info_ns.pid = target_pid; + msg_->u.reply.u.process_info_ns.verifier = target_verifier; + strncpy(msg_->u.reply.u.process_info_ns.process_name, target_process_name.c_str(), MAX_PROCESS_NAME); + msg_->u.reply.u.process_info_ns.return_code = MPI_ERR_NAME; + if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) + trace_printf("%s@%d - unsuccessful\n", method_name, __LINE__); + } + + // Send reply to requester + monreply(msg_, sockFd_); + + TRACE_EXIT; +} http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/monitor/linux/nsreqqueue.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/nsreqqueue.cxx b/core/sqf/monitor/linux/nsreqqueue.cxx index aa92ead..e189929 100644 --- a/core/sqf/monitor/linux/nsreqqueue.cxx +++ b/core/sqf/monitor/linux/nsreqqueue.cxx @@ -77,6 +77,16 @@ void CRequest::monreply(struct message_def *msg, int sockFd, int *error) msg->u.reply.u.process_info.return_code); } break; + case ReplyType_ProcessInfoNs: + size += sizeof(struct ProcessInfoNs_reply_def); + if (trace_settings & (TRACE_PROCESS_DETAIL)) + { + trace_printf("%s@%d reply type=%d(ProcessInfoNs), size=%d, sock=%d\n", method_name, __LINE__, + msg->u.reply.type, size, sockFd); + trace_printf("%s@%d process-info reply. rc=%d\n", method_name, __LINE__, + msg->u.reply.u.process_info.return_code); + } + break; default: if (trace_settings & (TRACE_PROCESS_DETAIL)) { http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/monitor/linux/pnode.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/pnode.cxx b/core/sqf/monitor/linux/pnode.cxx index 997b2a5..e3923a0 100644 --- a/core/sqf/monitor/linux/pnode.cxx +++ b/core/sqf/monitor/linux/pnode.cxx @@ -1639,6 +1639,39 @@ void CNodeContainer::AddedNode( CNode *node ) TRACE_EXIT; } +CProcess *CNodeContainer::AddCloneProcess( ProcessInfoNs_reply_def *processInfo ) +{ + const char method_name[] = "CNodeContainer::AddNode"; + TRACE_ENTRY; + + CLNode *lnode = Nodes->GetLNode(processInfo->nid); + CNode *node = lnode->GetNode(); + CProcess *process = node->CloneProcess( processInfo->nid + , processInfo->type + , processInfo->priority + , processInfo->backup + , processInfo->unhooked + , processInfo->process_name + , processInfo->port_name + , processInfo->pid + , processInfo->verifier + , processInfo->parent_nid + , processInfo->parent_pid + , processInfo->parent_verifier + , processInfo->event_messages + , processInfo->system_messages + , processInfo->pathStrId + , processInfo->ldpathStrId + , processInfo->programStrId + , processInfo->infile + , processInfo->outfile + , &processInfo->creation_time + , -1 );//processInfo->origPNidNs_); + + TRACE_EXIT; + return(process); +} + CNode *CNodeContainer::AddNode( int pnid ) { const char method_name[] = "CNodeContainer::AddNode"; @@ -2914,6 +2947,122 @@ CProcess *CNodeContainer::GetProcessByName( const char *name, bool checkstate ) } #ifndef NAMESERVER_PROCESS +CProcess *CNodeContainer::GetProcessNs( int nid + , int pid + , Verifier_t verifier ) +{ + const char method_name[] = "CNodeContainer::GetProcessNs"; + TRACE_ENTRY; + + CProcess *process = NULL; + + struct message_def msg; + memset(&msg, 0, sizeof(msg) ); // TODO: remove! + msg.type = MsgType_Service; + msg.noreply = false; + msg.reply_tag = REPLY_TAG; + msg.u.request.type = ReqType_ProcessInfoNs; + struct ProcessInfo_def *processInfo = &msg.u.request.u.process_info; + processInfo->nid = -1; + processInfo->pid = -1; + processInfo->verifier = -1; + processInfo->process_name[0] = 0; + processInfo->target_nid = nid; + processInfo->target_pid = pid; + processInfo->target_verifier = verifier; + processInfo->target_process_name[0] = 0; + + int error = NameServer->ProcessInfoNs(&msg); // in reqQueue thread (CExternalReq) + if (error == 0) + { + if ( (msg.type == MsgType_Service) && + (msg.u.reply.type == ReplyType_ProcessInfoNs) ) + { + if ( msg.u.reply.u.process_info_ns.return_code == MPI_SUCCESS ) + { + process = AddCloneProcess( &msg.u.reply.u.process_info_ns ); + } + else + { + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf), + "[%s] ProcessInfo failed, rc=%d\n" + , method_name, msg.u.reply.u.process_info_ns.return_code ); + mon_log_write( MON_NODE_GETPROCESSNS_1, SQ_LOG_ERR, buf ); + } + } + else + { + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf), + "[%s], Invalid MsgType(%d)/ReplyType(%d) for " + "ProcessInfoNs\n" + , method_name, msg.type, msg.u.reply.type ); + mon_log_write( MON_NODE_GETPROCESSNS_2, SQ_LOG_ERR, buf ); + } + } + + TRACE_EXIT; + return( process ); +} + +CProcess *CNodeContainer::GetProcessNs( const char *name, Verifier_t verifier ) +{ + const char method_name[] = "CNodeContainer::GetProcessNs"; + TRACE_ENTRY; + + CProcess *process = NULL; + + struct message_def msg; + memset(&msg, 0, sizeof(msg) ); // TODO: remove! + msg.type = MsgType_Service; + msg.noreply = false; + msg.reply_tag = REPLY_TAG; + msg.u.request.type = ReqType_ProcessInfoNs; + struct ProcessInfo_def *processInfo = &msg.u.request.u.process_info; + processInfo->nid = -1; + processInfo->pid = -1; + processInfo->verifier = -1; + processInfo->process_name[0] = 0; + processInfo->target_nid = -1; + processInfo->target_pid = -1; + processInfo->target_verifier = verifier; + STRCPY( processInfo->target_process_name, name); + + int error = NameServer->ProcessInfoNs(&msg); // in reqQueue thread (CExternalReq) + if (error == 0) + { + if ( (msg.type == MsgType_Service) && + (msg.u.reply.type == ReplyType_ProcessInfoNs) ) + { + if ( msg.u.reply.u.process_info_ns.return_code == MPI_SUCCESS ) + { + process = AddCloneProcess( &msg.u.reply.u.process_info_ns ); + } + else + { + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf), + "[%s] ProcessInfo failed, rc=%d\n" + , method_name, msg.u.reply.u.process_info_ns.return_code ); + mon_log_write( MON_NODE_GETPROCESSNS_3, SQ_LOG_ERR, buf ); + } + } + else + { + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf), + "[%s], Invalid MsgType(%d)/ReplyType(%d) for " + "ProcessInfo\n" + , method_name, msg.type, msg.u.reply.type ); + mon_log_write( MON_NODE_GETPROCESSNS_4, SQ_LOG_ERR, buf ); + } + } + + TRACE_EXIT; + return( process ); +} + SyncState CNodeContainer::GetTmState ( SyncState check_state ) { SyncState state = check_state; http://git-wip-us.apache.org/repos/asf/trafodion/blob/abf3c429/core/sqf/monitor/linux/pnode.h ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/pnode.h b/core/sqf/monitor/linux/pnode.h index 39b012b..48af3bc 100644 --- a/core/sqf/monitor/linux/pnode.h +++ b/core/sqf/monitor/linux/pnode.h @@ -66,6 +66,7 @@ public: ~CNodeContainer( void ); void AddedNode( CNode *node ); + CProcess *AddCloneProcess( ProcessInfoNs_reply_def *processInfo ); CNode *AddNode( int pnid ); void AddNodes( void ); void AddToSpareNodesList( int pnid ); @@ -111,6 +112,11 @@ public: , bool checkstate=true , bool backupOk=false ); CProcess *GetProcessByName( const char *name, bool checkstate=true ); + CProcess *GetProcessNs( int nid + , int pid + , Verifier_t verifier ); + CProcess *GetProcessNs( const char *name + , Verifier_t verifier ); SyncState GetTmState( SyncState check_state ); CNode *GetZoneNode( int zid );
