[TRAFODION-2881] HA fixes Fixed multiple problems in monitor Allgather() socket reconnect logic. - Separated node down detection logic from communication errors and timeouts to better handle multiple failure scenarios - Better handling network resets - Additional trace information - Fixed 'node up' hang in monitor shell due to TmSync race condition
Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/e832d827 Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/e832d827 Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/e832d827 Branch: refs/heads/master Commit: e832d827507521998567d4cc5d92e4239007d19a Parents: db319b1 Author: Zalo Correa <[email protected]> Authored: Thu Jan 11 09:32:11 2018 -0800 Committer: Zalo Correa <[email protected]> Committed: Thu Jan 11 09:32:11 2018 -0800 ---------------------------------------------------------------------- .../export/include/common/evl_sqlog_eventnum.h | 26 + core/sqf/monitor/linux/cluster.cxx | 967 ++++++++++--------- core/sqf/monitor/linux/cluster.h | 6 +- core/sqf/monitor/linux/cmsh.cxx | 146 ++- core/sqf/monitor/linux/cmsh.h | 2 + core/sqf/monitor/linux/commaccept.cxx | 37 +- core/sqf/monitor/linux/commaccept.h | 3 +- core/sqf/monitor/linux/internal.h | 2 +- core/sqf/monitor/linux/macros.gmk | 4 +- core/sqf/monitor/linux/montest_run.virtual | 10 +- core/sqf/monitor/linux/pnode.cxx | 8 +- core/sqf/monitor/linux/pnodeconfig.cxx | 33 +- core/sqf/monitor/linux/pnodeconfig.h | 2 + core/sqf/monitor/linux/process.cxx | 161 +-- core/sqf/monitor/linux/redirector.cxx | 11 +- core/sqf/monitor/linux/redirector.h | 2 +- core/sqf/monitor/linux/reqexit.cxx | 7 +- core/sqf/monitor/linux/reqnewproc.cxx | 30 +- core/sqf/monitor/linux/reqopen.cxx | 8 +- core/sqf/monitor/linux/reqqueue.cxx | 5 + core/sqf/monitor/linux/shell.cxx | 101 +- core/sqf/monitor/linux/tcdbsqlite.cxx | 8 - core/sqf/monitor/linux/tmsync.cxx | 46 +- core/sqf/monitor/linux/zclient.cxx | 6 +- core/sqf/sql/scripts/sqnodestatus | 23 +- core/sqf/src/seabed/src/msmon.cpp | 2 +- 26 files changed, 1010 insertions(+), 646 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafodion/blob/e832d827/core/sqf/export/include/common/evl_sqlog_eventnum.h ---------------------------------------------------------------------- diff --git a/core/sqf/export/include/common/evl_sqlog_eventnum.h b/core/sqf/export/include/common/evl_sqlog_eventnum.h index c8b4d59..96c3df9 100644 --- a/core/sqf/export/include/common/evl_sqlog_eventnum.h +++ b/core/sqf/export/include/common/evl_sqlog_eventnum.h @@ -54,6 +54,7 @@ #define MON_CLUSTER_CLUSTER_1 101010401 #define MON_CLUSTER_CLUSTER_2 101010402 #define MON_CLUSTER_UCLUSTER 101010501 + #define MON_CLUSTER_HANDLEOTHERNODE_1 101010601 #define MON_CLUSTER_HANDLEOTHERNODE_2 101010602 #define MON_CLUSTER_HANDLEOTHERNODE_3 101010603 @@ -65,17 +66,21 @@ #define MON_CLUSTER_HANDLEOTHERNODE_9 101010609 #define MON_CLUSTER_HANDLEOTHERNODE_10 101010610 #define MON_CLUSTER_HANDLEOTHERNODE_11 101010611 + #define MON_CLUSTER_HANDLEMYNODE_1 101010701 #define MON_CLUSTER_HANDLEMYNODE_2 101010702 #define MON_CLUSTER_HANDLEMYNODE_3 101010703 #define MON_CLUSTER_HANDLEMYNODE_4 101010704 #define MON_CLUSTER_HANDLEMYNODE_5 101010705 #define MON_CLUSTER_HANDLEMYNODE_6 101010706 + #define MON_CLUSTER_CHECKWHODIED_1 101010801 #define MON_CLUSTER_CHECKWHODIED_2 101010802 + #define MON_CLUSTER_REGROUP_1 101010901 #define MON_CLUSTER_REGROUP_2 101010902 #define MON_CLUSTER_INITCLUSTER 101011001 + #define MON_CLUSTER_MARKDOWN_1 101011101 #define MON_CLUSTER_MARKDOWN_2 101011102 #define MON_CLUSTER_MARKDOWN_3 101011103 @@ -84,15 +89,18 @@ #define MON_CLUSTER_FORCEDOWN_1 101011401 #define MON_CLUSTER_FORCEDOWN_2 101011402 #define MON_CLUSTER_CLUSTER_MANAGER 101011501 + #define MON_CLUSTER_EXPEDITEDOWN_1 101011601 #define MON_CLUSTER_EXPEDITEDOWN_2 101011602 #define MON_CLUSTER_RESPONSIVE_1 101011701 #define MON_CLUSTER_RESPONSIVE_2 101011702 #define MON_CLUSTER_RESPONSIVE_3 101011703 + #define MON_CLUSTER_CONNTONEWMON_1 101011801 #define MON_CLUSTER_CONNTONEWMON_2 101011802 #define MON_CLUSTER_CONNTONEWMON_10 101011810 #define MON_CLUSTER_CONNTONEWMON_11 101011811 + #define MON_CLUSTER_MERGETONEWMON_1 101011901 #define MON_CLUSTER_MERGETONEWMON_2 101011902 #define MON_CLUSTER_MERGETONEWMON_3 101011903 @@ -100,6 +108,7 @@ #define MON_CLUSTER_MERGETONEWMON_11 101011911 #define MON_CLUSTER_MERGETONEWMON_12 101011912 #define MON_CLUSTER_MERGETONEWMON_13 101011913 + #define MON_CLUSTER_REINTEGRATE_1 101012001 #define MON_CLUSTER_REINTEGRATE_2 101012002 #define MON_CLUSTER_REINTEGRATE_3 101012003 @@ -107,12 +116,15 @@ #define MON_CLUSTER_REINTEGRATE_11 101012011 #define MON_CLUSTER_REINTEGRATE_12 101012012 #define MON_CLUSTER_REINTEGRATE_13 101012013 + #define MON_CLUSTER_REINITCLUSTER_1 101012101 #define MON_CLUSTER_REINITCLUSTER_2 101012102 #define MON_CLUSTER_REINITCLUSTER_3 101012103 #define MON_CLUSTER_REINITCLUSTER_4 101012104 #define MON_CLUSTER_REINITCLUSTER_5 101012105 + #define MON_CLUSTER_CHECKWHOJOINED_1 101012201 + #define MON_CLUSTER_UPDDATECLUSTER_1 101012301 #define MON_CLUSTER_UPDDATECLUSTER_2 101012302 #define MON_CLUSTER_UPDTCLUSTERSTATE_1 101012401 @@ -131,11 +143,14 @@ #define MON_CLUSTER_INITCONFIGCLUSTER_2 101013002 #define MON_CLUSTER_INITCONFIGCLUSTER_3 101013003 #define MON_CLUSTER_INITCONFIGCLUSTER_4 101013004 + #define MON_CLUSTER_SETNEWCOMM_1 101013101 #define MON_CLUSTER_SETNEWCOMM_2 101013102 + #define MON_CLUSTER_SETNEWSOCK_1 101013201 #define MON_CLUSTER_SETNEWSOCK_2 101013202 #define MON_CLUSTER_SETNEWSOCK_3 101013203 + #define MON_CLUSTER_ALLGATHERSOCK_1 101013301 #define MON_CLUSTER_ALLGATHERSOCK_2 101013302 #define MON_CLUSTER_ALLGATHERSOCK_3 101013303 @@ -145,6 +160,7 @@ #define MON_CLUSTER_ALLGATHERSOCK_7 101013307 #define MON_CLUSTER_ALLGATHERSOCK_8 101013308 #define MON_CLUSTER_EPOLLCTL_1 101013401 + #define MON_CLUSTER_INITCLUSTERSOCKS_1 101013501 #define MON_CLUSTER_INITCLUSTERSOCKS_2 101013502 #define MON_CLUSTER_INITCLUSTERSOCKS_3 101013503 @@ -155,6 +171,7 @@ #define MON_CLUSTER_INITCLUSTERSOCKS_8 101013508 #define MON_CLUSTER_INITACCEPTSOCK_1 101013601 #define MON_CLUSTER_INITACCEPTSOCK_2 101013602 + #define MON_CLUSTER_MKSRVSOCK_1 101013701 #define MON_CLUSTER_MKSRVSOCK_2 101013702 #define MON_CLUSTER_MKSRVSOCK_3 101013703 @@ -163,6 +180,7 @@ #define MON_CLUSTER_MKSRVSOCK_6 101013706 #define MON_CLUSTER_MKSRVSOCK_7 101013707 #define MON_CLUSTER_MKSRVSOCK_8 101013708 + #define MON_CLUSTER_MKCLTSOCK_1 101013801 #define MON_CLUSTER_MKCLTSOCK_2 101013802 #define MON_CLUSTER_MKCLTSOCK_3 101013803 @@ -175,29 +193,37 @@ #define MON_CLUSTER_MKCLTSOCK_10 101013810 #define MON_CLUSTER_MKCLTSOCK_11 101013811 #define MON_CLUSTER_MKCLTSOCK_12 101013812 + #define MON_CLUSTER_CONNECT_1 101013901 #define MON_CLUSTER_CONNECT_2 101013902 #define MON_CLUSTER_CONNECT_3 101013903 #define MON_CLUSTER_CONNECT_4 101013904 #define MON_CLUSTER_CONNECT_5 101013905 + #define MON_CLUSTER_CONNECTTOSELF_1 101014001 #define MON_CLUSTER_CONNECTTOSELF_2 101014002 #define MON_CLUSTER_CONNECTTOSELF_3 101014003 + #define MON_CLUSTER_ACCEPTSOCK_1 101014101 #define MON_CLUSTER_ACCEPTSOCK_2 101014102 #define MON_CLUSTER_ACCEPTSOCK_3 101014103 + #define MON_CLUSTER_INITSERVERSOCK_1 101014201 #define MON_CLUSTER_INITSERVERSOCK_2 101014202 #define MON_CLUSTER_INITSERVERSOCK_3 101014203 #define MON_CLUSTER_INITSERVERSOCK_4 101014204 + #define MON_CLUSTER_SOFTNODEDOWN_1 101014301 #define MON_CLUSTER_SOFTNODEDOWN_2 101014302 #define MON_CLUSTER_SOFTNODEDOWN_3 101014303 + #define MON_CLUSTER_SOFTNODEUP_1 101014401 + #define MON_CLUSTER_SETKEEPALIVESOCKOPT_1 101014501 #define MON_CLUSTER_SETKEEPALIVESOCKOPT_2 101014502 #define MON_CLUSTER_SETKEEPALIVESOCKOPT_3 101014503 #define MON_CLUSTER_SETKEEPALIVESOCKOPT_4 101014504 + #define MON_CLUSTER_NO_LICENSE_VERIFIERS 101014601 #define MON_CLUSTER_ALLGATHERSOCKRECONN_1 101014701 http://git-wip-us.apache.org/repos/asf/trafodion/blob/e832d827/core/sqf/monitor/linux/cluster.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/cluster.cxx b/core/sqf/monitor/linux/cluster.cxx index 66f515c..c22e8ea 100644 --- a/core/sqf/monitor/linux/cluster.cxx +++ b/core/sqf/monitor/linux/cluster.cxx @@ -347,10 +347,6 @@ void CCluster::NodeReady( CNode *spareNode ) } spareNode->SetActivatingSpare( false ); - if ( MyNode->IsCreator() ) - { - MyNode->SetCreator( false, -1, -1 ); - } ResetIntegratingPNid(); TRACE_EXIT; @@ -848,10 +844,6 @@ void CCluster::HardNodeDown (int pnid, bool communicate_state) { if ( node->GetPNid() == integratingPNid_ ) { - if ( MyNode->IsCreator() ) - { - MyNode->SetCreator( false, -1, -1 ); - } ResetIntegratingPNid(); } node->KillAllDown(); @@ -1470,10 +1462,6 @@ int CCluster::HardNodeUp( int pnid, char *node_name ) } } - if ( MyNode->IsCreator() ) - { - MyNode->SetCreator( false, -1, -1 ); - } ResetIntegratingPNid(); if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) @@ -3394,6 +3382,18 @@ bool CCluster::PingSockPeer(CNode *node) pingSock = Monitor->Connect( node->GetCommPort() ); if ( pingSock < 0 ) { + if (node->GetState() != State_Up) + { + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf( "%s@%d - Node %s (%d) is not up, " + "socks_[%d]=%d\n" + , method_name, __LINE__ + , node->GetName(), node->GetPNid() + , node->GetPNid(), socks_[node->GetPNid()] ); + } + break; + } sleep( MAX_RECONN_PING_WAIT_TIMEOUT ); } else @@ -3828,12 +3828,13 @@ void CCluster::ReIntegrateSock( int initProblem ) myNodeInfo.creator = true; myNodeInfo.creatorShellPid = CreatorShellPid; myNodeInfo.creatorShellVerifier = CreatorShellVerifier; + myNodeInfo.ping = false; if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) { trace_printf( "%s@%d - Connected to creator monitor, sending my info, " "node %d (%s), commPort=%s, syncPort=%s, creator=%d, " - "creatorShellPid=%d:%d\n" + "creatorShellPid=%d:%d, ping=%d\n" , method_name, __LINE__ , myNodeInfo.pnid , myNodeInfo.nodeName @@ -3841,7 +3842,8 @@ void CCluster::ReIntegrateSock( int initProblem ) , myNodeInfo.syncPort , myNodeInfo.creator , myNodeInfo.creatorShellPid - , myNodeInfo.creatorShellVerifier ); + , myNodeInfo.creatorShellVerifier + , myNodeInfo.ping ); } rc = Monitor->SendSock( (char *) &myNodeInfo @@ -3907,6 +3909,7 @@ void CCluster::ReIntegrateSock( int initProblem ) myNodeInfo.creator = false; myNodeInfo.creatorShellPid = -1; myNodeInfo.creatorShellVerifier = -1; + myNodeInfo.ping = false; for (int i=0; i<pnodeCount; i++) { if ( nodeInfo[i].creatorPNid != -1 && @@ -4237,9 +4240,36 @@ void CCluster::ResetIntegratingPNid( void ) abort(); } + if ( MyNode->IsCreator() ) + { + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf( "%s@%d - Resetting creator pnid=%d\n", + method_name, __LINE__, MyPNID ); + } + + MyNode->SetCreator( false, -1, -1 ); + } + + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf( "%s@%d - Resetting integratingPNid_=%d\n", + method_name, __LINE__, integratingPNid_ ); + } + integratingPNid_ = -1; - // Indicate to the commAcceptor thread to begin accepting connections - CommAccept.setAccepting( true ); + + if (!CommAccept.isAccepting()) + { + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf( "%s@%d - Triggering commAcceptor thread to begin accepting connections\n", + method_name, __LINE__ ); + } + + // Indicate to the commAcceptor thread to begin accepting connections + CommAccept.startAccepting(); + } TRACE_EXIT; } @@ -4251,7 +4281,7 @@ void CCluster::SetIntegratingPNid( int pnid ) integratingPNid_ = pnid; // Indicate to the commAcceptor thread to stop accepting connections - CommAccept.setAccepting( false ); + CommAccept.stopAccepting(); TRACE_EXIT; } @@ -4643,7 +4673,6 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St peer_t p[GetConfigPNodesMax()]; memset( p, 0, sizeof(p) ); tag = 0; // make compiler happy - struct timespec currentTime; // Set to twice the ZClient session timeout static int sessionTimeout = ZClientEnabled ? (ZClient->GetSessionTimeout() * 2) : 120; @@ -4740,10 +4769,14 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St while ( 1 ) { reconnected: + bool checkConnections = false; + bool doReconnect = false; + bool resetConnections = false; + int peerTimedoutCount = 0; int maxEvents = 2*GetConfigPNodesCount() - nsent - nrecv; if ( maxEvents == 0 ) break; int nw; - int zerr = ZOK; + peer_t *peer; while ( 1 ) { @@ -4754,321 +4787,180 @@ reconnected: if ( nw == 0 ) { // Timeout, no fd's ready for ( int iPeer = 0; iPeer < GetConfigPNodesCount(); iPeer++ ) + { // Check no IO completion on peers + peer = &p[indexToPnid_[iPeer]]; + if ( (peer->p_receiving) || (peer->p_sending) ) + { + peerTimedoutCount++; + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf( "%s@%d - EPOLL timeout (%d) on: %s(%d), " + "socks_[%d]=%d, " + "peer->p_sending=%d, " + "peer->p_receiving=%d\n" + , method_name, __LINE__ + , peerTimedoutCount + , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer] + , indexToPnid_[iPeer] + , socks_[indexToPnid_[iPeer]] + , peer->p_sending + , peer->p_receiving ); + } + + if (peer->p_initial_check && !reconnecting) + { // Set the session timeout relative to now + peer->p_initial_check = false; + clock_gettime(CLOCK_REALTIME, &peer->znodeFailedTime); + peer->znodeFailedTime.tv_sec += sessionTimeout; + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf( "%s@%d" " - Znode Fail Time %ld(secs)\n" + , method_name, __LINE__ + , peer->znodeFailedTime.tv_sec); + } + } + + if ( IsRealCluster && peer->p_timeout_count < sv_epoll_retry_count ) + { + peer->p_timeout_count++; + checkConnections = true; + if (peer->p_timeout_count == sv_epoll_retry_count) + { + resetConnections = true; + } + } + else + { + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf( "%s@%d" " - Peer timed out: %s(%d), " + "socks_[%d]=%d, " + "peer->p_timeout_count=%d\n" + , method_name, __LINE__ + , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer] + , indexToPnid_[iPeer] + , socks_[indexToPnid_[iPeer]] + , peer->p_timeout_count ); + } + } + } + } // Check no IO completion on peers + + if (checkConnections) { - peer_t *peer = &p[indexToPnid_[iPeer]]; - if ( (indexToPnid_[iPeer] != MyPNID) && (socks_[indexToPnid_[iPeer]] != -1) ) + checkConnections = false; + if (trace_settings & TRACE_RECOVERY) { - if ( (peer->p_receiving) || (peer->p_sending) ) - { - if ( ! ZClientEnabled ) + trace_printf( "%s@%d - Initianing AllgatherSockReconnect()," + " peerTimedoutCount=%d\n" + , method_name, __LINE__ + , peerTimedoutCount ); + } + // First, check ability to connect to all peers + // An err returned will mean that connect failed with + // at least one peer. No err implies that possible network + // reset occurred and there is probably one dead connection + // to a peer where no IOs will complete ever, so connections + // to all peers must be reestablished. + err = AllgatherSockReconnect( stats, false ); + if (err == MPI_SUCCESS) + { // Connections to all peers are good + if (resetConnections) + { // Establish new connections on all peers + resetConnections = false; + err = AllgatherSockReconnect( stats, true ); + // Redrive IOs on new peer connections + nsent = 0; nrecv = 0; + for ( int i = 0; i < GetConfigPNodesCount(); i++ ) { - if (peer->p_initial_check && !reconnecting) - { - peer->p_initial_check = false; - clock_gettime(CLOCK_REALTIME, &peer->znodeFailedTime); - peer->znodeFailedTime.tv_sec += sessionTimeout; - if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) - { - trace_printf( "%s@%d" " - Znode Fail Time %ld(secs)\n" - , method_name, __LINE__ - , peer->znodeFailedTime.tv_sec); - } + peer = &p[indexToPnid_[i]]; + if ( indexToPnid_[i] == MyPNID || socks_[indexToPnid_[i]] == -1 ) + { // peer is me or not available + peer->p_sending = peer->p_receiving = false; + nsent++; + nrecv++; } - - if ( peer->p_timeout_count < sv_epoll_retry_count ) + else { - peer->p_timeout_count++; - - if (IsRealCluster) - { - if (trace_settings & TRACE_RECOVERY) - { - trace_printf( "%s@%d - Initianing AllgatherSockReconnect(), trigger: %s(%d)," - " timeout count=%d," - " sending=%d," - " receiving=%d\n" - , method_name, __LINE__ - , Node[indexToPnid_[iPeer]]->GetName(), iPeer - , peer->p_timeout_count - , peer->p_sending - , peer->p_receiving); - } - // Attempt reconnect to all peers - err = AllgatherSockReconnect( stats ); - // Redrive IOs on live peer connections - nsent = 0; nrecv = 0; - for ( int i = 0; i < GetConfigPNodesCount(); i++ ) - { - peer_t *peer = &p[indexToPnid_[i]]; - if ( indexToPnid_[i] == MyPNID || socks_[indexToPnid_[i]] == -1 ) - { - peer->p_sending = peer->p_receiving = false; - nsent++; - nrecv++; - } - else - { - peer->p_sending = peer->p_receiving = true; - peer->p_sent = peer->p_received = 0; - peer->p_timeout_count = 0; - peer->p_n2recv = -1; - peer->p_buff = ((char *) rbuf) + (indexToPnid_[i] * CommBufSize); - - struct epoll_event event; - event.data.fd = socks_[indexToPnid_[i]]; - event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP | EPOLLERR | EPOLLHUP; - EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[indexToPnid_[i]], &event ); - } - if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) - { - trace_printf( "%s@%d" " - socks_[%d]=%d, " - "peer->p_sending=%d, " - "peer->p_receiving=%d\n" - , method_name, __LINE__ - , indexToPnid_[i] - , socks_[indexToPnid_[i]] - , peer->p_sending - , peer->p_receiving ); - } - } - reconnectSeqNum_ = seqNum_; - reconnecting = true; - if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) - { - trace_printf( "%s@%d" " - Reconnecting!\n" - , method_name, __LINE__ ); - } - goto reconnected; - } - continue; + peer->p_sending = peer->p_receiving = true; + peer->p_sent = peer->p_received = 0; + peer->p_n2recv = -1; + peer->p_buff = ((char *) rbuf) + (indexToPnid_[i] * CommBufSize); + struct epoll_event event; + event.data.fd = socks_[indexToPnid_[i]]; + event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP | EPOLLERR | EPOLLHUP; + EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[indexToPnid_[i]], &event ); } - if (trace_settings & TRACE_RECOVERY) + } + } // (resetConnections) + } // (err == MPI_SUCCESS) + else + { + for ( int i = 0; i < GetConfigPNodesCount(); i++ ) + { + peer = &p[indexToPnid_[i]]; + if ( indexToPnid_[i] != MyPNID && socks_[indexToPnid_[i]] == -1 ) + { // peer is me or no longer available + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY) && + (peer->p_sending || peer->p_receiving) ) { - trace_printf( "%s@%d - Peer timeout triggered: " - "peer->p_timeout_count %d, " - "sv_epoll_retry_count %d\n" - " socks_[%d]=%d\n" - " stats[%d].MPI_ERROR=%s\n" + trace_printf( "%s@%d No IO completion on %s(%d):socks_[%d]=%d, " + "peer->p_sending=%d, " + "peer->p_receiving=%d\n" , method_name, __LINE__ - , peer->p_timeout_count - , sv_epoll_retry_count - , indexToPnid_[iPeer] - , socks_[indexToPnid_[iPeer]] - , iPeer - , ErrorMsg(stats[indexToPnid_[iPeer]].MPI_ERROR) ); + , Node[indexToPnid_[i]]->GetName(), indexToPnid_[i] + , indexToPnid_[i] + , socks_[indexToPnid_[i]] + , peer->p_sending + , peer->p_receiving ); } - } - else - { - if (peer->p_initial_check && !reconnecting) + if (peer->p_sending) { - peer->p_initial_check = false; - clock_gettime(CLOCK_REALTIME, &peer->znodeFailedTime); - peer->znodeFailedTime.tv_sec += sessionTimeout; - if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) - { - trace_printf( "%s@%d" " - Znode Fail Time %ld(secs)\n" - , method_name, __LINE__ - , peer->znodeFailedTime.tv_sec); - } - + nsent++; + peer->p_sending = false; } - // If not expired, stay in the loop - if ( ! ZClient->IsZNodeExpired( Node[indexToPnid_[iPeer]]->GetName(), zerr )) + if (peer->p_receiving) { - if ( zerr == ZCONNECTIONLOSS || zerr == ZOPERATIONTIMEOUT ) - { - // Ignore transient errors with the quorum. - // However, if longer than the session - // timeout, handle it as a hard error. - clock_gettime(CLOCK_REALTIME, ¤tTime); - if (currentTime.tv_sec < peer->znodeFailedTime.tv_sec) - { - // Failsafe - peer->p_timeout_count++; - - if ( peer->p_timeout_count < sv_epoll_retry_count ) - { - if (IsRealCluster) - { - if (trace_settings & TRACE_RECOVERY) - { - trace_printf( "%s@%d - Initianing AllgatherSockReconnect(), trigger: %s(%d)," - " timeout count=%d," - " sending=%d," - " receiving=%d\n" - , method_name, __LINE__ - , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer] - , peer->p_timeout_count - , peer->p_sending - , peer->p_receiving); - } - // Attempt reconnect to all peers - err = AllgatherSockReconnect( stats ); - // Redrive IOs on live peer connections - nsent = 0; nrecv = 0; - for ( int i = 0; i < GetConfigPNodesCount(); i++ ) - { - peer_t *peer = &p[indexToPnid_[i]]; - if ( indexToPnid_[i] == MyPNID || socks_[indexToPnid_[i]] == -1 ) - { - peer->p_sending = peer->p_receiving = false; - nsent++; - nrecv++; - } - else - { - peer->p_sending = peer->p_receiving = true; - peer->p_sent = peer->p_received = 0; - peer->p_timeout_count = 0; - peer->p_n2recv = -1; - peer->p_buff = ((char *) rbuf) + (indexToPnid_[i] * CommBufSize); - - struct epoll_event event; - event.data.fd = socks_[indexToPnid_[i]]; - event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP | EPOLLERR | EPOLLHUP; - EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[i], &event ); - } - if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) - { - trace_printf( "%s@%d" " - socks_[%d]=%d, " - "peer->p_sending=%d, " - "peer->p_receiving=%d\n" - , method_name, __LINE__ - , indexToPnid_[i] - , socks_[indexToPnid_[i]] - , peer->p_sending - , peer->p_receiving ); - } - } - reconnectSeqNum_ = seqNum_; - reconnecting = true; - if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) - { - trace_printf( "%s@%d" " - Reconnecting!\n" - , method_name, __LINE__ ); - } - goto reconnected; - } - continue; - } - } - if (trace_settings & TRACE_RECOVERY) - { - trace_printf( "%s@%d - Znode Failed triggered\n" - " Current Time %ld(secs)\n" - " Znode Fail Time %ld(secs)\n" - , method_name, __LINE__ - , currentTime.tv_sec - , peer->znodeFailedTime.tv_sec); - } - } - else - { - // Failsafe - peer->p_timeout_count++; - - if ( peer->p_timeout_count < sv_epoll_retry_count ) - { - if (IsRealCluster) - { - if (trace_settings & TRACE_RECOVERY) - { - trace_printf( "%s@%d - Initianing AllgatherSockReconnect(), trigger: %s(%d)," - " timeout count=%d," - " sending=%d," - " receiving=%d\n" - , method_name, __LINE__ - , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer] - , peer->p_timeout_count - , peer->p_sending - , peer->p_receiving); - } - // Attempt reconnect to all peers - err = AllgatherSockReconnect( stats ); - // Redrive IOs on live peer connections - nsent = 0; nrecv = 0; - for ( int i = 0; i < GetConfigPNodesCount(); i++ ) - { - peer_t *peer = &p[indexToPnid_[i]]; - if ( indexToPnid_[i] == MyPNID || socks_[indexToPnid_[i]] == -1 ) - { - peer->p_sending = peer->p_receiving = false; - nsent++; - nrecv++; - } - else - { - peer->p_sending = peer->p_receiving = true; - peer->p_sent = peer->p_received = 0; - peer->p_timeout_count = 0; - peer->p_n2recv = -1; - peer->p_buff = ((char *) rbuf) + (indexToPnid_[i] * CommBufSize); - - struct epoll_event event; - event.data.fd = socks_[indexToPnid_[i]]; - event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP | EPOLLERR | EPOLLHUP; - EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[i], &event ); - } - if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) - { - trace_printf( "%s@%d" " - socks_[%d]=%d, " - "peer->p_sending=%d, " - "peer->p_receiving=%d\n" - , method_name, __LINE__ - , indexToPnid_[i] - , socks_[indexToPnid_[i]] - , peer->p_sending - , peer->p_receiving ); - } - } - reconnectSeqNum_ = seqNum_; - reconnecting = true; - if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) - { - trace_printf( "%s@%d" " - Reconnecting!\n" - , method_name, __LINE__ ); - } - goto reconnected; - } - continue; - } - } + peer->p_receiving = false; + nrecv++; } } + } + } + doReconnect = true; + } // (checkConnections) - if (trace_settings & TRACE_RECOVERY) - { - trace_printf( "%s@%d - err=%d, socks_[%d]=%d, stats[%d].MPI_ERROR=%s\n" - , method_name, __LINE__ - , err - , indexToPnid_[iPeer] - , socks_[indexToPnid_[iPeer]] - , indexToPnid_[iPeer] - , ErrorMsg(stats[indexToPnid_[iPeer]].MPI_ERROR) ); - } + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + for ( int i = 0; i < GetConfigPNodesCount(); i++ ) + { + peer = &p[indexToPnid_[i]]; + trace_printf( "%s@%d doReconnect=%d, %s(%d):socks_[%d]=%d, " + "peer->p_sending=%d, " + "peer->p_receiving=%d\n" + , method_name, __LINE__ + , doReconnect + , Node[indexToPnid_[i]]->GetName(), indexToPnid_[i] + , indexToPnid_[i] + , socks_[indexToPnid_[i]] + , peer->p_sending + , peer->p_receiving ); + } + } - if ( err == MPI_ERR_IN_STATUS - && stats[indexToPnid_[iPeer]].MPI_ERROR == MPI_ERR_EXITED) - { - // At this point, this peer is not responding and - // reconnects failed or its znode expired - char buf[MON_STRING_BUF_SIZE]; - snprintf( buf, sizeof(buf) - , "[%s@%d] Not heard from peer=%d (node=%s) " - "(current seq # is %lld)\n" - , method_name - , __LINE__ - , indexToPnid_[iPeer] - , Node[indexToPnid_[iPeer]]->GetName() - , seqNum_ ); - mon_log_write( MON_CLUSTER_ALLGATHERSOCK_2, SQ_LOG_CRIT, buf ); - } - } + if (doReconnect) + { + reconnectSeqNum_ = seqNum_; + reconnecting = true; + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf( "%s@%d" " - Reconnecting! (reconnectSeqNum_=%lld)\n" + , method_name, __LINE__, reconnectSeqNum_ ); } + goto reconnected; } - } - + } // ( nw == 0 ) + if ( nw < 0 ) { // Got an error char ebuff[256]; @@ -5393,7 +5285,7 @@ early_exit: return err; } -int CCluster::AllgatherSockReconnect( MPI_Status *stats ) +int CCluster::AllgatherSockReconnect( MPI_Status *stats, bool reestablishConnections ) { const char method_name[] = "CCluster::AllgatherSockReconnect"; TRACE_ENTRY; @@ -5428,6 +5320,18 @@ int CCluster::AllgatherSockReconnect( MPI_Status *stats ) , node->GetName(), node->GetPNid() , idst, socks_[idst] ); } + stats[idst].MPI_ERROR = MPI_ERR_EXITED; + stats[idst].count = 0; + err = MPI_ERR_IN_STATUS; + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf( "%s@%d - Setting Node %s (%d) status to " + "stats[%d].MPI_ERROR=%s\n" + , method_name, __LINE__ + , node->GetName(), node->GetPNid() + , idst + , ErrorMsg(stats[idst].MPI_ERROR) ); + } // Remove old socket from epoll set, it may not be there struct epoll_event event; event.data.fd = socks_[idst]; @@ -5437,22 +5341,28 @@ int CCluster::AllgatherSockReconnect( MPI_Status *stats ) } continue; } - reconnectSock = ConnectSockPeer( node, idst ); - if (reconnectSock == -1) + if (PingSockPeer(node)) { - stats[idst].MPI_ERROR = MPI_ERR_EXITED; - stats[idst].count = 0; - err = MPI_ERR_IN_STATUS; - if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + reconnectSock = ConnectSockPeer( node, idst, reestablishConnections ); + if (reconnectSock == -1) { - trace_printf( "%s@%d - Setting Node %s (%d) status to " - "stats[%d].MPI_ERROR=%s\n" - , method_name, __LINE__ - , node->GetName(), node->GetPNid() - , idst - , ErrorMsg(stats[idst].MPI_ERROR) ); + stats[idst].MPI_ERROR = MPI_ERR_EXITED; + stats[idst].count = 0; + err = MPI_ERR_IN_STATUS; + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf( "%s@%d - Setting Node %s (%d) status to " + "stats[%d].MPI_ERROR=%s\n" + , method_name, __LINE__ + , node->GetName(), node->GetPNid() + , idst + , ErrorMsg(stats[idst].MPI_ERROR) ); + } } - if (node->GetState() != State_Up) + } + else + { + if (socks_[idst] != -1) { if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) { @@ -5470,6 +5380,19 @@ int CCluster::AllgatherSockReconnect( MPI_Status *stats ) EpollCtlDelete( epollFD_, socks_[idst], &event ); socks_[idst] = -1; } + reconnectSock = -1; + stats[idst].MPI_ERROR = MPI_ERR_EXITED; + stats[idst].count = 0; + err = MPI_ERR_IN_STATUS; + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf( "%s@%d - Setting Node %s (%d) status to " + "stats[%d].MPI_ERROR=%s\n" + , method_name, __LINE__ + , node->GetName(), node->GetPNid() + , idst + , ErrorMsg(stats[idst].MPI_ERROR) ); + } } } else if ( j == MyPNID ) @@ -5491,6 +5414,18 @@ int CCluster::AllgatherSockReconnect( MPI_Status *stats ) , node->GetName(), node->GetPNid() , idst, socks_[idst] ); } + stats[idst].MPI_ERROR = MPI_ERR_EXITED; + stats[idst].count = 0; + err = MPI_ERR_IN_STATUS; + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf( "%s@%d - Setting Node %s (%d) status to " + "stats[%d].MPI_ERROR=%s\n" + , method_name, __LINE__ + , node->GetName(), node->GetPNid() + , idst + , ErrorMsg(stats[idst].MPI_ERROR) ); + } // Remove old socket from epoll set, it may not be there struct epoll_event event; event.data.fd = socks_[idst]; @@ -5508,7 +5443,7 @@ int CCluster::AllgatherSockReconnect( MPI_Status *stats ) } if (PingSockPeer(node)) { - reconnectSock = AcceptSockPeer( node, idst ); + reconnectSock = AcceptSockPeer( node, idst, reestablishConnections ); if (reconnectSock == -1) { stats[idst].MPI_ERROR = MPI_ERR_EXITED; @@ -5523,24 +5458,6 @@ int CCluster::AllgatherSockReconnect( MPI_Status *stats ) , idst , ErrorMsg(stats[idst].MPI_ERROR) ); } - if (node->GetState() != State_Up) - { - if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) - { - trace_printf( "%s@%d - Node %s (%d) is not up, " - "removing old socket from epoll set, " - "socks_[%d]=%d\n" - , method_name, __LINE__ - , node->GetName(), node->GetPNid() - , idst, socks_[idst] ); - } - // Remove old socket from epoll set, it may not be there - struct epoll_event event; - event.data.fd = socks_[idst]; - event.events = 0; - EpollCtlDelete( epollFD_, socks_[idst], &event ); - socks_[idst] = -1; - } } } else @@ -5561,6 +5478,7 @@ int CCluster::AllgatherSockReconnect( MPI_Status *stats ) event.data.fd = socks_[idst]; event.events = 0; EpollCtlDelete( epollFD_, socks_[idst], &event ); + socks_[idst] = -1; } reconnectSock = -1; stats[idst].MPI_ERROR = MPI_ERR_EXITED; @@ -5618,14 +5536,13 @@ int CCluster::AllgatherSockReconnect( MPI_Status *stats ) return( err ); } -int CCluster::AcceptSockPeer( CNode *node, int peer ) +int CCluster::AcceptSockPeer( CNode *node, int peer, bool reestablishConnections ) { const char method_name[] = "CCluster::AcceptSockPeer"; TRACE_ENTRY; int rc = MPI_SUCCESS; int reconnectSock = -1; - unsigned char srcaddr[4]; struct hostent *he; // Get my host structure via my node name @@ -5646,13 +5563,9 @@ int CCluster::AcceptSockPeer( CNode *node, int peer ) { if (trace_settings & TRACE_RECOVERY) { - trace_printf( "%s@%d Accepting server socket: from %s(%d), src=%d.%d.%d.%d, port=%d\n" + trace_printf( "%s@%d Accepting server socket: from %s(%d), port=%d\n" , method_name, __LINE__ , node->GetName(), node->GetPNid() - , (int)((unsigned char *)srcaddr)[0] - , (int)((unsigned char *)srcaddr)[1] - , (int)((unsigned char *)srcaddr)[2] - , (int)((unsigned char *)srcaddr)[3] , MyNode->GetSyncSocketPort() ); } @@ -5679,17 +5592,40 @@ int CCluster::AcceptSockPeer( CNode *node, int peer ) rc = -1; } - if (socks_[peer] != -1) + if (reestablishConnections) { - // Remove old socket from epoll set, it may not be there - struct epoll_event event; - event.data.fd = socks_[peer]; - event.events = 0; - EpollCtlDelete( epollFD_, socks_[peer], &event ); + if (socks_[peer] != -1) + { + // Remove old socket from epoll set, it may not be there + struct epoll_event event; + event.data.fd = socks_[peer]; + event.events = 0; + EpollCtlDelete( epollFD_, socks_[peer], &event ); + if (node->GetState() != State_Up) + { + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf( "%s@%d - Node %s (%d) is not up, " + "removing old socket from epoll set, " + "socks_[%d]=%d\n" + , method_name, __LINE__ + , node->GetName(), node->GetPNid() + , peer, socks_[peer] ); + } + socks_[peer] = -1; + } + } + if (reconnectSock != -1) + { + socks_[peer] = reconnectSock; + } } - if (reconnectSock != -1) + else { - socks_[peer] = reconnectSock; + if (reconnectSock != -1) + { + close( (int)reconnectSock ); + } } } @@ -5697,7 +5633,7 @@ int CCluster::AcceptSockPeer( CNode *node, int peer ) return rc; } -int CCluster::ConnectSockPeer( CNode *node, int peer ) +int CCluster::ConnectSockPeer( CNode *node, int peer, bool reestablishConnections ) { const char method_name[] = "CCluster::ConnectSockPeer"; TRACE_ENTRY; @@ -5791,17 +5727,40 @@ int CCluster::ConnectSockPeer( CNode *node, int peer ) rc = -1; } - if (socks_[peer] != -1) + if (reestablishConnections) { - // Remove old socket from epoll set, it may not be there - struct epoll_event event; - event.data.fd = socks_[peer]; - event.events = 0; - EpollCtlDelete( epollFD_, socks_[peer], &event ); + if (socks_[peer] != -1) + { + // Remove old socket from epoll set, it may not be there + struct epoll_event event; + event.data.fd = socks_[peer]; + event.events = 0; + EpollCtlDelete( epollFD_, socks_[peer], &event ); + if (node->GetState() != State_Up) + { + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf( "%s@%d - Node %s (%d) is not up, " + "removing old socket from epoll set, " + "socks_[%d]=%d\n" + , method_name, __LINE__ + , node->GetName(), node->GetPNid() + , peer, socks_[peer] ); + } + socks_[peer] = -1; + } + } + if (reconnectSock != -1) + { + socks_[peer] = reconnectSock; + } } - if (reconnectSock != -1) + else { - socks_[peer] = reconnectSock; + if (reconnectSock != -1) + { + close( (int)reconnectSock ); + } } } @@ -6118,14 +6077,21 @@ bool CCluster::ValidateSeqNum( cluster_state_def_t nodestate[] ) if ( GetConfigPNodesCount() == 1 ) return true; - // Count occurrences of sequence numbers from other nodes + // Count occurrences of sequence numbers for (int pnid = 0; pnid < GetConfigPNodesMax(); pnid++) { CNode *node= Nodes->GetNode( pnid ); if (!node) continue; if (node->GetState() != State_Up) continue; - seqNum = nodestate[pnid].seq_num; + if ( pnid == MyPNID ) + { + seqNum = nodestate[pnid].seq_num = seqNum_; + } + else + { + seqNum = nodestate[pnid].seq_num; + } if (trace_settings & TRACE_SYNC) { @@ -6183,7 +6149,7 @@ bool CCluster::ValidateSeqNum( cluster_state_def_t nodestate[] ) if (trace_settings & TRACE_SYNC) { - if ( seqNum_ != seqNumBucket[mostCountsIndex] ) + if ( lowSeqNum_ != highSeqNum_ ) { trace_printf( "%s@%d Most common seq num=%lld (%d nodes), " "%d buckets, low=%lld, high=%lld, local seq num (%lld) did not match.\n" @@ -6197,8 +6163,8 @@ bool CCluster::ValidateSeqNum( cluster_state_def_t nodestate[] ) } } - // Fail if my seqnum does not match majority - return seqNum_ == seqNumBucket[mostCountsIndex]; + // Fail if any sequence number does not match + return( lowSeqNum_ == highSeqNum_ ); } void CCluster::HandleDownNode( int pnid ) @@ -6353,13 +6319,9 @@ void CCluster::UpdateClusterState( bool &doShutdown, { if (!noComm) { - trace_printf( "%s@%d - Communication error from node %d:\n" - " node_state=%d\n" - " change_nid=%d\n" - " seq_num=#%lld\n" + trace_printf( "%s@%d - Communication error from node %d, " + " seq_num=#%lld\n" , method_name, __LINE__, index - , recvBuf->nodeInfo.node_state - , recvBuf->nodeInfo.change_nid , seqNum_ ); } } @@ -6803,7 +6765,18 @@ bool CCluster::ProcessClusterData( struct sync_buffer_def * syncBuf, } // if we have already processed buffer, skip it - if (lastSeqNum_ == msgBuf->nodeInfo.seq_num) continue; + if (lastSeqNum_ >= msgBuf->nodeInfo.seq_num) continue; + + if (trace_settings & TRACE_SYNC) + { + trace_printf("%s@%d - Processing buffer for node %d, swpRecCount_=%d, seq_num=%lld, " + "lastSeqNum_=%lld, msg_count=%d, msg_offset=%d\n", + method_name, __LINE__, i, swpRecCount_, + msgBuf->nodeInfo.seq_num, + lastSeqNum_, + msgBuf->msgInfo.msg_count, + msgBuf->msgInfo.msg_offset); + } // reset msg length to zero to initialize for PopMsg() msgBuf->msgInfo.msg_offset = 0; @@ -6814,7 +6787,7 @@ bool CCluster::ProcessClusterData( struct sync_buffer_def * syncBuf, if ( deferredTmSync ) { // This node has sent a TmSync message. Process it now. if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) - trace_printf("%s@%d - Handling deferred TmSync message for " + trace_printf("%s@%d - Handling deferred TmSync messages for " "node %d\n", method_name, __LINE__, i); struct internal_msg_def *msg; @@ -6840,16 +6813,9 @@ bool CCluster::ProcessClusterData( struct sync_buffer_def * syncBuf, } else if ( !deferredTmSync ) { - // temp trace - if (trace_settings & TRACE_SYNC) - { - trace_printf("%s@%d - For node %d, swpRecCount_=%d, " - "seq_num=%lld,msg_count=%d, msg_offset=%d\n", - method_name, __LINE__, i, swpRecCount_, - msgBuf->nodeInfo.seq_num, - msgBuf->msgInfo.msg_count, - msgBuf->msgInfo.msg_offset); - } + if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) + trace_printf("%s@%d - Handling messages for " + "node %d\n", method_name, __LINE__, i); do { // Get the next sync msg for the node @@ -7103,15 +7069,20 @@ bool CCluster::exchangeNodeData ( ) reconnected: if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) - trace_printf( "%s@%d - doing Allgather size=%d, swpRecCount_=%d, message count=%d " - "message seq_num=%lld, seqNum_=%lld, lastSeqNum_=%lld\n" + trace_printf( "%s@%d - doing Allgather size=%d, swpRecCount_=%d, " + "message count=%d, message seq_num=%lld, " + "seqNum_=%lld, lastSeqNum_=%lld, lowSeqNum_=%lld, " + "highSeqNum_=%lld, reconnectSeqNum_=%lld\n" , method_name, __LINE__ , Nodes->GetSyncSize() , swpRecCount_ , send_buffer->msgInfo.msg_count , send_buffer->nodeInfo.seq_num , seqNum_ - , lastSeqNum_); + , lastSeqNum_ + , lowSeqNum_ + , highSeqNum_ + , reconnectSeqNum_); struct timespec ts_ag_begin; clock_gettime(CLOCK_REALTIME, &ts_ag_begin); @@ -7191,24 +7162,36 @@ reconnected: , recv_buffer , status , send_buffer->nodeInfo.change_nid); - } - if ( ProcessClusterData( recv_buffer, send_buffer, false ) ) - { // There is a TmSync message remaining to be handled - ProcessClusterData( recv_buffer, send_buffer, true ); - } - else - { if ( lastAllgatherWithLastSyncBuffer ) { seqNum_ = savedSeqNum; lastAllgatherWithLastSyncBuffer = false; send_buffer = Nodes->GetSyncBuffer(); + + if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) + trace_printf( "%s@%d - Resetting lastAllgatherWithLastSyncBuffer=%d\n" + , method_name, __LINE__ + , lastAllgatherWithLastSyncBuffer); + goto reconnected; } if ( reconnectSeqNum_ != 0 ) { + + if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) + trace_printf( "%s@%d - Allgather IO retry, swpRecCount_=%d, " + "seqNum_=%lld, lastSeqNum_=%lld, lowSeqNum_=%lld, " + "highSeqNum_=%lld, reconnectSeqNum_=%lld\n" + , method_name, __LINE__ + , swpRecCount_ + , seqNum_ + , lastSeqNum_ + , lowSeqNum_ + , highSeqNum_ + , reconnectSeqNum_); + // The Allgather() has executed a reconnect at reconnectSeqNum_. // The UpdateClusterState has set the lowSeqNum_and highSeqNum_ // in the current IO exchange which will indicate whether there is @@ -7224,6 +7207,12 @@ reconnected: // Indicate to follow up the next exchange with current SyncBuffer lastAllgatherWithLastSyncBuffer = true; lowSeqNum_ = highSeqNum_ = reconnectSeqNum_ = 0; + + if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) + trace_printf( "%s@%d - Setting lastAllgatherWithLastSyncBuffer=%d\n" + , method_name, __LINE__ + , lastAllgatherWithLastSyncBuffer); + goto reconnected; } else if (seqNum_ < highSeqNum_) @@ -7231,10 +7220,25 @@ reconnected: // Redo exchange with the current SyncBuffer send_buffer = Nodes->GetSyncBuffer(); lowSeqNum_ = highSeqNum_ = reconnectSeqNum_ = 0; + + if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) + trace_printf( "%s@%d - lastAllgatherWithLastSyncBuffer=%d\n" + , method_name, __LINE__ + , lastAllgatherWithLastSyncBuffer); + goto reconnected; } + lowSeqNum_ = highSeqNum_ = reconnectSeqNum_ = 0; } + } + + if ( ProcessClusterData( recv_buffer, send_buffer, false ) ) + { // There is a TmSync message remaining to be handled + ProcessClusterData( recv_buffer, send_buffer, true ); + } + if (swpRecCount_ == 1) + { // Save the sync buffer and corresponding sequence number we just processed // On reconnect we must resend the last buffer and the current buffer // to ensure dropped buffers are processed by all monitor processe in the @@ -7246,22 +7250,21 @@ reconnected: if ( ++seqNum_ == 0) seqNum_ = 1; } - // ?? Need the following? Possibly not since maybe all sync cycle - // dependent code was removed -- need to check. // Wake up any threads waiting on the completion of a sync cycle syncCycle_.wakeAll(); if (doShutdown) result = checkIfDone( ); - --swpRecCount_; - if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) trace_printf( "%s@%d - node data exchange completed, swpRecCount_=%d, " - "seqNum_=%lld, lastSeqNum_=%lld\n" + "seqNum_=%lld, lastSeqNum_=%lld, reconnectSeqNum_=%lld\n" , method_name, __LINE__ , swpRecCount_ , seqNum_ - , lastSeqNum_); + , lastSeqNum_ + , reconnectSeqNum_); + + --swpRecCount_; TRACE_EXIT; @@ -7329,16 +7332,21 @@ void CCluster::exchangeTmSyncData ( struct sync_def *sync, bool bumpSync ) reconnected: - if (trace_settings & (TRACE_SYNC_DETAIL | TRACE_TMSYNC)) - trace_printf( "%s@%d - doing Allgather size=%d, swpRecCount_=%d, message count=%d " - "message seq_num=%lld, seqNum_=%lld, lastSeqNum_=%lld\n" + if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) + trace_printf( "%s@%d - doing Allgather size=%d, swpRecCount_=%d, " + "message count=%d, message seq_num=%lld, " + "seqNum_=%lld, lastSeqNum_=%lld, lowSeqNum_=%lld, " + "highSeqNum_=%lld, reconnectSeqNum_=%lld\n" , method_name, __LINE__ , Nodes->GetSyncSize() , swpRecCount_ , send_buffer->msgInfo.msg_count , send_buffer->nodeInfo.seq_num , seqNum_ - , lastSeqNum_); + , lastSeqNum_ + , lowSeqNum_ + , highSeqNum_ + , reconnectSeqNum_); struct timespec ts_ag_begin; clock_gettime(CLOCK_REALTIME, &ts_ag_begin); @@ -7418,68 +7426,103 @@ reconnected: , recv_buffer , status , send_buffer->nodeInfo.change_nid); - } - if ( ProcessClusterData( recv_buffer, send_buffer, false ) ) - { // There is a TmSync message remaining to be handled - ProcessClusterData( recv_buffer, send_buffer, true ); - } + if ( lastAllgatherWithLastSyncBuffer ) + { + seqNum_ = savedSeqNum; + lastAllgatherWithLastSyncBuffer = false; + send_buffer = Nodes->GetSyncBuffer(); - if ( lastAllgatherWithLastSyncBuffer ) - { - seqNum_ = savedSeqNum; - lastAllgatherWithLastSyncBuffer = false; - send_buffer = Nodes->GetSyncBuffer(); - goto reconnected; - } + if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) + trace_printf( "%s@%d - Resetting lastAllgatherWithLastSyncBuffer=%d\n" + , method_name, __LINE__ + , lastAllgatherWithLastSyncBuffer); - if ( reconnectSeqNum_ != 0 ) - { - // The Allgather() has executed a reconnect at reconnectSeqNum_. - // The UpdateClusterState has set the lowSeqNum_and highSeqNum_ - // in the current IO exchange which will indicate whether there is - // a mismatch in IOs between monitor processes. If there is a mismatch, - // the lowSeqNum_and highSeqNum_ relative to our current seqNum_ - // will determine how to redrive the exchange of node data. - if (seqNum_ > lowSeqNum_) - { // A remote monitor did not receive our last SyncBuffer - // Redo exchange with the previous SyncBuffer - send_buffer = Nodes->GetLastSyncBuffer(); - savedSeqNum = seqNum_; - seqNum_ = lastSeqNum_; - // Indicate to follow up the next exchange with current SyncBuffer - lastAllgatherWithLastSyncBuffer = true; - lowSeqNum_ = highSeqNum_ = reconnectSeqNum_ = 0; goto reconnected; } - else if (seqNum_ < highSeqNum_) - { // The local monitor did not receive the last remote SyncBuffer - // Redo exchange with the current SyncBuffer - send_buffer = Nodes->GetSyncBuffer(); + + if ( reconnectSeqNum_ != 0 ) + { + if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) + trace_printf( "%s@%d - Allgather IO retry, swpRecCount_=%d, " + "seqNum_=%lld, lastSeqNum_=%lld, lowSeqNum_=%lld, " + "highSeqNum_=%lld, reconnectSeqNum_=%lld\n" + , method_name, __LINE__ + , swpRecCount_ + , seqNum_ + , lastSeqNum_ + , lowSeqNum_ + , highSeqNum_ + , reconnectSeqNum_); + + // The Allgather() has executed a reconnect at reconnectSeqNum_. + // The UpdateClusterState has set the lowSeqNum_and highSeqNum_ + // in the current IO exchange which will indicate whether there is + // a mismatch in IOs between monitor processes. If there is a mismatch, + // the lowSeqNum_and highSeqNum_ relative to our current seqNum_ + // will determine how to redrive the exchange of node data. + if (seqNum_ > lowSeqNum_) + { // A remote monitor did not receive our last SyncBuffer + // Redo exchange with the previous SyncBuffer + send_buffer = Nodes->GetLastSyncBuffer(); + savedSeqNum = seqNum_; + seqNum_ = lastSeqNum_; + // Indicate to follow up the next exchange with current SyncBuffer + lastAllgatherWithLastSyncBuffer = true; + lowSeqNum_ = highSeqNum_ = reconnectSeqNum_ = 0; + + if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) + trace_printf( "%s@%d - Setting lastAllgatherWithLastSyncBuffer=%d\n" + , method_name, __LINE__ + , lastAllgatherWithLastSyncBuffer); + + goto reconnected; + } + else if (seqNum_ < highSeqNum_) + { // The local monitor did not receive the last remote SyncBuffer + // Redo exchange with the current SyncBuffer + send_buffer = Nodes->GetSyncBuffer(); + lowSeqNum_ = highSeqNum_ = reconnectSeqNum_ = 0; + + if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) + trace_printf( "%s@%d - lastAllgatherWithLastSyncBuffer=%d\n" + , method_name, __LINE__ + , lastAllgatherWithLastSyncBuffer); + + goto reconnected; + } lowSeqNum_ = highSeqNum_ = reconnectSeqNum_ = 0; - goto reconnected; } } - // Save the sync buffer and corresponding sequence number we just processed - // On reconnect we must resend the last buffer and the current buffer - // to ensure dropped buffers are processed by all monitor processe in the - // correct order - Nodes->SaveMyLastSyncBuffer(); - lastSeqNum_ = seqNum_; - - // Increment count of "Allgather" calls. If wrap-around, start again at 1. - if ( ++seqNum_ == 0) seqNum_ = 1; + if ( ProcessClusterData( recv_buffer, send_buffer, false ) ) + { // There is a TmSync message remaining to be handled + ProcessClusterData( recv_buffer, send_buffer, true ); + } - --swpRecCount_; + if (swpRecCount_ == 1) + { + // Save the sync buffer and corresponding sequence number we just processed + // On reconnect we must resend the last buffer and the current buffer + // to ensure dropped buffers are processed by all monitor processe in the + // correct order + Nodes->SaveMyLastSyncBuffer(); + lastSeqNum_ = seqNum_; + + // Increment count of "Allgather" calls. If wrap-around, start again at 1. + if ( ++seqNum_ == 0) seqNum_ = 1; + } if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) trace_printf( "%s@%d - node data exchange completed, swpRecCount_=%d, " - "seqNum_=%lld, lastSeqNum_=%lld\n" + "seqNum_=%lld, lastSeqNum_=%lld, reconnectSeqNum_=%lld\n" , method_name, __LINE__ , swpRecCount_ , seqNum_ - , lastSeqNum_); + , lastSeqNum_ + , reconnectSeqNum_); + + --swpRecCount_; TRACE_EXIT; } http://git-wip-us.apache.org/repos/asf/trafodion/blob/e832d827/core/sqf/monitor/linux/cluster.h ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/cluster.h b/core/sqf/monitor/linux/cluster.h index e743341..58d3540 100644 --- a/core/sqf/monitor/linux/cluster.h +++ b/core/sqf/monitor/linux/cluster.h @@ -336,9 +336,9 @@ private: int Allgather(int nbytes, void *sbuf, char *rbuf, int tag, MPI_Status *stats); int AllgatherIB(int nbytes, void *sbuf, char *rbuf, int tag, MPI_Status *stats); int AllgatherSock(int nbytes, void *sbuf, char *rbuf, int tag, MPI_Status *stats); - int AllgatherSockReconnect( MPI_Status *stats ); - int AcceptSockPeer( CNode *node, int peer ); - int ConnectSockPeer( CNode *node, int peer ); + int AllgatherSockReconnect( MPI_Status *stats, bool reestablishConnections = false ); + int AcceptSockPeer( CNode *node, int peer, bool reestablishConnections = false ); + int ConnectSockPeer( CNode *node, int peer, bool reestablishConnections = false ); void ValidateClusterState( cluster_state_def_t nodestate[], bool haveDivergence ); http://git-wip-us.apache.org/repos/asf/trafodion/blob/e832d827/core/sqf/monitor/linux/cmsh.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/cmsh.cxx b/core/sqf/monitor/linux/cmsh.cxx index c8c4975..709b293 100644 --- a/core/sqf/monitor/linux/cmsh.cxx +++ b/core/sqf/monitor/linux/cmsh.cxx @@ -91,6 +91,46 @@ int CCmsh::PopulateClusterState( void ) /////////////////////////////////////////////////////////////////////////////// // +// Function/Method: CCmsh::PopulateNodeState +// +// Description: Executes the command string passed in the constructor and +// populates the internal node state list with the state of each node +// in the cluster. Clients can then inquire about state of each node. +// +// Return: +// 0 - success +// -1 - failure +// +/////////////////////////////////////////////////////////////////////////////// +int CCmsh::PopulateNodeState( const char *nodeName ) +{ + const char method_name[] = "CCmsh::PopulateNodeState"; + TRACE_ENTRY; + + int rc; + + // The caller should save and close stdin before calling this proc + // and restore it when done. This is to prevent ssh from consuming + // caller's stdin contents when executing the command. + string commandArgs; + { + commandArgs = "-n "; + commandArgs += nodeName; + } + rc = ExecuteCommand( commandArgs.c_str(), nodeStateList_ ); + if ( rc == -1 ) + { + char la_buf[MON_STRING_BUF_SIZE]; + sprintf(la_buf, "[%s] Error: While executing '%s' command\n", method_name, command_.data()); + mon_log_write(MON_CMSH_GET_CLUSTER_STATE_1, SQ_LOG_ERR, la_buf); + } + + TRACE_EXIT; + return( rc ); +} + +/////////////////////////////////////////////////////////////////////////////// +// // Function/Method: CCmsh::GetClusterState // // Description: Updates the state of the nodes in the physicalNodeMap passed in @@ -128,31 +168,97 @@ int CCmsh::GetClusterState( PhysicalNodeNameMap_t &physicalNodeMap ) if (it != physicalNodeMap.end()) { // TEST_POINT and Exclude List : to force state down on node name - const char *downNodeName = getenv( TP001_NODE_DOWN ); - const char *downNodeList = getenv( TRAF_EXCLUDE_LIST ); - string downNodeString = " "; - if (downNodeList) - { - downNodeString += downNodeList; - downNodeString += " "; - } - string downNodeToFind = " "; - downNodeToFind += nodeName.c_str(); - downNodeToFind += " "; - if (((downNodeList != NULL) && - strstr(downNodeString.c_str(),downNodeToFind.c_str())) || - ( (downNodeName != NULL) && - !strcmp( downNodeName, nodeName.c_str()) )) - { - nodeState = StateDown; - } - + const char *downNodeName = getenv( TP001_NODE_DOWN ); + const char *downNodeList = getenv( TRAF_EXCLUDE_LIST ); + string downNodeString = " "; + if (downNodeList) + { + downNodeString += downNodeList; + downNodeString += " "; + } + string downNodeToFind = " "; + downNodeToFind += nodeName.c_str(); + downNodeToFind += " "; + if (((downNodeList != NULL) && + strstr(downNodeString.c_str(),downNodeToFind.c_str())) || + ((downNodeName != NULL) && + !strcmp(downNodeName, nodeName.c_str()))) + { + nodeState = StateDown; + } + // Set physical node state physicalNode = it->second; physicalNode->SetState( nodeState ); } } - } + } + + TRACE_EXIT; + return( rc ); +} + +/////////////////////////////////////////////////////////////////////////////// +// +// Function/Method: CCmsh::GetNodeState +// +// Description: Updates the state of the nodeName in the physicalNode passed in +// as a parameter. Caller should ensure that the node names are already +// present in the physicalNodeMap. +// +// Return: +// 0 - success +// -1 - failure +// +/////////////////////////////////////////////////////////////////////////////// +int CCmsh::GetNodeState( char *name ,CPhysicalNode *physicalNode ) +{ + const char method_name[] = "CCmsh::GetNodeState"; + TRACE_ENTRY; + + int rc; + + rc = PopulateNodeState( name ); + + if ( rc != -1 ) + { + // Parse each line extracting name and state + string nodeName; + NodeState_t nodeState; + PhysicalNodeNameMap_t::iterator it; + + StringList_t::iterator alit; + for ( alit = nodeStateList_.begin(); alit != nodeStateList_.end() ; alit++ ) + { + ParseNodeStatus( *alit, nodeName, nodeState ); + + // TEST_POINT and Exclude List : to force state down on node name + const char *downNodeName = getenv( TP001_NODE_DOWN ); + const char *downNodeList = getenv( TRAF_EXCLUDE_LIST ); + string downNodeString = " "; + if (downNodeList) + { + downNodeString += downNodeList; + downNodeString += " "; + } + string downNodeToFind = " "; + downNodeToFind += nodeName.c_str(); + downNodeToFind += " "; + if (((downNodeList != NULL) && + strstr(downNodeString.c_str(),downNodeToFind.c_str())) || + ((downNodeName != NULL) && + !strcmp(downNodeName, nodeName.c_str()))) + { + nodeState = StateDown; + } + + if (!strcmp(name, nodeName.c_str())) + { + // Set physical node state + physicalNode->SetState( nodeState ); + } + } + } TRACE_EXIT; return( rc ); http://git-wip-us.apache.org/repos/asf/trafodion/blob/e832d827/core/sqf/monitor/linux/cmsh.h ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/cmsh.h b/core/sqf/monitor/linux/cmsh.h index dce2e79..f1226bd 100644 --- a/core/sqf/monitor/linux/cmsh.h +++ b/core/sqf/monitor/linux/cmsh.h @@ -44,6 +44,7 @@ public: int PopulateClusterState( void ); int GetClusterState( PhysicalNodeNameMap_t &physicalNodeMap ); + int GetNodeState( char *name ,CPhysicalNode *physicalNode ); bool IsInitialized( void ); void ClearClusterState( void ) { nodeStateList_.clear(); } NodeState_t GetNodeState( char nodeName[] ); @@ -52,6 +53,7 @@ private: NodeStateList_t nodeStateList_; void ParseNodeStatus( string &nodeStatus, string &nodeName, NodeState_t &state ); + int PopulateNodeState( const char *nodeName ); }; #endif /*CMSH_H_*/ http://git-wip-us.apache.org/repos/asf/trafodion/blob/e832d827/core/sqf/monitor/linux/commaccept.cxx ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/commaccept.cxx b/core/sqf/monitor/linux/commaccept.cxx index a9045af..21b30a6 100644 --- a/core/sqf/monitor/linux/commaccept.cxx +++ b/core/sqf/monitor/linux/commaccept.cxx @@ -942,7 +942,7 @@ void CCommAccept::commAcceptorIB() { char buf[MON_STRING_BUF_SIZE]; MPI_Error_class( rc, &errClass ); - snprintf(buf, sizeof(buf), "[%s], cannot accept new monitor: %s.\n", + snprintf(buf, sizeof(buf), "[%s], cannot accept remote monitor: %s.\n", method_name, ErrorMsg(rc)); mon_log_write(MON_COMMACCEPT_15, SQ_LOG_ERR, buf); @@ -1101,13 +1101,44 @@ void CCommAccept::start() TRACE_EXIT; } -void CCommAccept::setAccepting( bool accepting ) +void CCommAccept::startAccepting( void ) { + const char method_name[] = "CCommAccept::startAccepting"; + TRACE_ENTRY; + + CAutoLock lock( getLocker( ) ); + + if ( !accepting_ ) + { + accepting_ = true; + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf( "%s@%d - Enabling accepting_=%d\n" + , method_name, __LINE__, accepting_ ); + } + CLock::wakeOne(); + } + + TRACE_EXIT; +} + +void CCommAccept::stopAccepting( void ) +{ + const char method_name[] = "CCommAccept::stopAccepting"; + TRACE_ENTRY; + CAutoLock lock( getLocker( ) ); - accepting_ = accepting; if ( accepting_ ) { + accepting_ = false; + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf( "%s@%d - Disabling accepting_=%d\n" + , method_name, __LINE__, accepting_ ); + } CLock::wakeOne(); } + + TRACE_EXIT; } http://git-wip-us.apache.org/repos/asf/trafodion/blob/e832d827/core/sqf/monitor/linux/commaccept.h ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/commaccept.h b/core/sqf/monitor/linux/commaccept.h index ac85efb..c32d975 100644 --- a/core/sqf/monitor/linux/commaccept.h +++ b/core/sqf/monitor/linux/commaccept.h @@ -41,7 +41,8 @@ public: bool isAccepting( void ) { CAutoLock lock(getLocker()); return( accepting_ ); } void processNewComm( MPI_Comm interComm ); void processNewSock( int sockFd ); - void setAccepting( bool accepting ); + void startAccepting( void ); + void stopAccepting( void ); void start( void ); void shutdownWork( void ); http://git-wip-us.apache.org/repos/asf/trafodion/blob/e832d827/core/sqf/monitor/linux/internal.h ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/internal.h b/core/sqf/monitor/linux/internal.h index f69f424..b0f118c 100644 --- a/core/sqf/monitor/linux/internal.h +++ b/core/sqf/monitor/linux/internal.h @@ -442,7 +442,7 @@ typedef struct nodeId_s int creatorPNid; int creatorShellPid; Verifier_t creatorShellVerifier; - bool creator; // NEW monitor set to true to tell creator it is the CREATOR + bool creator; // NEW monitor sets to true to tell creator it is the CREATOR bool ping; // Monitor sets to true to tell remote monitor // it is just checking that it can communicate with it. // Used during allgather reconnect http://git-wip-us.apache.org/repos/asf/trafodion/blob/e832d827/core/sqf/monitor/linux/macros.gmk ---------------------------------------------------------------------- diff --git a/core/sqf/monitor/linux/macros.gmk b/core/sqf/monitor/linux/macros.gmk index e5324f1..5bb24ec 100644 --- a/core/sqf/monitor/linux/macros.gmk +++ b/core/sqf/monitor/linux/macros.gmk @@ -53,8 +53,8 @@ endif MPI_CC := $(CC) MPI_CXX := $(CXX) -CC = mpicc -CXX = $(MPICH_ROOT)/bin/mpicxx +CC = mpicc $(ARCH_SPECIFIC_OPTION) +CXX = $(MPICH_ROOT)/bin/mpicxx $(ARCH_SPECIFIC_OPTION) ifeq ($(SQ_MTYPE),32) CC += -mpi32
