[TRAFODION-2883] Preliminary Scale Enhacements
- Added timestamps to node down system message
- Added timestamps and values to registry change notifications
- Fixed monitor trace causing memory overwrites
- Implemented AGENT mode monitor functionality
o This is a pre reliminary change to remove dependency on OpenMPI during
initialization of operational cluster by creating a cluster of one node
(MASTER monitor) where other remote nodes (SLAVE monitors) join the
cluster through the MASTER
- Implemented MASTER monitor selection logic
- Scale bug fixes found when creating clusters greater than 120 nodes-
Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/bded0e84
Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/bded0e84
Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/bded0e84
Branch: refs/heads/master
Commit: bded0e843f8b600a5459c5353bbdc9f59d6d6551
Parents: 887051c
Author: Zalo Correa <[email protected]>
Authored: Tue Feb 27 17:34:25 2018 -0800
Committer: Zalo Correa <[email protected]>
Committed: Tue Feb 27 17:34:25 2018 -0800
----------------------------------------------------------------------
.../export/include/common/evl_sqlog_eventnum.h | 14 +
core/sqf/monitor/linux/cluster.cxx | 291 +++++++++++++--
core/sqf/monitor/linux/cluster.h | 7 +
core/sqf/monitor/linux/commaccept.cxx | 108 +++---
core/sqf/monitor/linux/mlio.cxx | 8 +-
core/sqf/monitor/linux/monitor.cxx | 370 ++++++++++++++++---
core/sqf/monitor/linux/monitor.h | 2 -
core/sqf/monitor/linux/pnode.cxx | 55 +--
core/sqf/monitor/linux/process.cxx | 75 +++-
core/sqf/monitor/linux/reqqueue.cxx | 1 +
core/sqf/monitor/linux/zclient.cxx | 343 ++++++++++++++++-
core/sqf/monitor/linux/zclient.h | 5 +
core/sqf/sqenvcom.sh | 12 +
core/sqf/sql/scripts/sqnodes.pm | 4 +-
core/sqf/src/trafconf/clusterconf.cpp | 10 +
core/sqf/src/trafconf/clusterconf.h | 4 +
16 files changed, 1131 insertions(+), 178 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/trafodion/blob/bded0e84/core/sqf/export/include/common/evl_sqlog_eventnum.h
----------------------------------------------------------------------
diff --git a/core/sqf/export/include/common/evl_sqlog_eventnum.h
b/core/sqf/export/include/common/evl_sqlog_eventnum.h
index 96c3df9..0418c70 100644
--- a/core/sqf/export/include/common/evl_sqlog_eventnum.h
+++ b/core/sqf/export/include/common/evl_sqlog_eventnum.h
@@ -255,6 +255,12 @@
#define MON_MONITOR_MAIN_9 101020109
#define MON_MONITOR_MAIN_10 101020110
#define MON_MONITOR_MAIN_11 101020111
+#define MON_MONITOR_MAIN_12 101020112
+#define MON_MONITOR_MAIN_13 101020113
+#define MON_MONITOR_MAIN_14 101020114
+#define MON_MONITOR_MAIN_15 101020115
+#define MON_MONITOR_MAIN_16 101020116
+#define MON_MONITOR_MAIN_17 101020117
#define MON_MONITOR_TMLEADER_1 101020201
#define MON_MONITOR_TMLEADER_2 101020202
#define MON_MONITOR_DEATH_HANDLER_1 101020301
@@ -895,6 +901,14 @@
#define MON_ZCLIENT_ISZNODEEXPIRED_2 101371802
#define MON_ZCLIENT_CHECKMYZNODE_1 101371901
#define MON_ZCLIENT_CHECKMYZNODE_2 101371902
+#define MON_ZCLIENT_AMICONFIGUREDMASTER_1 101372101
+#define MON_ZCLIENT_AMICONFIGUREDMASTER_2 101372102
+#define MON_ZCLIENT_WAITFORANDRETURNMASTER 101372103
+#define MON_ZCLIENT_CREATEMASTERZNODE 101372104
+#define MON_ZCLIENT_WATCHMASTERNODEDELETE_1 101372105
+#define MON_ZCLIENT_WATCHMASTERNODEDELETE_2 101372106
+#define MON_ZCLIENT_WATCHMASTERNODEDELETE_3 101372107
+#define MON_ZCLIENT_CREATEORSETMASTERWATCH 101372108
/* Module: zconfig.cxx = 38 */
#define ZCONFIG_ZCONFIG_1 101380101
http://git-wip-us.apache.org/repos/asf/trafodion/blob/bded0e84/core/sqf/monitor/linux/cluster.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/cluster.cxx
b/core/sqf/monitor/linux/cluster.cxx
index 49697dd..d1b3e91 100644
--- a/core/sqf/monitor/linux/cluster.cxx
+++ b/core/sqf/monitor/linux/cluster.cxx
@@ -67,6 +67,11 @@ using namespace std;
extern bool IAmIntegrating;
extern bool IAmIntegrated;
extern bool IsRealCluster;
+extern bool IsAgentMode;
+extern bool IsMaster;
+extern bool IsMPIChild;
+extern char MasterMonitorName[MAX_PROCESS_PATH];
+extern char Node_name[MPI_MAX_PROCESSOR_NAME];
extern bool ZClientEnabled;
extern char IntegratingMonitorPort[MPI_MAX_PORT_NAME];
extern char MyCommPort[MPI_MAX_PORT_NAME];
@@ -289,11 +294,12 @@ void CCluster::NodeTmReady( int nid )
if (trace_settings & (TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC |
TRACE_TMSYNC))
{
- trace_printf( "%s@%d - TmReady, nid=%d, tm count=%d, soft node
down=%d\n"
+ trace_printf( "%s@%d - TmReady, nid=%d, tm count=%d, soft node
down=%d, LNodesCount=%d\n"
, method_name, __LINE__
, nid
, tmReadyCount_
- , MyNode->IsSoftNodeDown() );
+ , MyNode->IsSoftNodeDown()
+ , MyNode->GetLNodesCount() );
}
MyNode->StartPStartDPersistentDTM( nid );
@@ -352,6 +358,131 @@ void CCluster::NodeReady( CNode *spareNode )
TRACE_EXIT;
}
+// Assign leaders as required
+// Current leaders are TM Leader and Monitor Leader
+void CCluster::AssignLeaders( int pnid, bool checkProcess )
+{
+ const char method_name[] = "CCluster::AssignLeaders";
+ TRACE_ENTRY;
+
+ AssignTmLeader ( pnid, checkProcess );
+ AssignMonitorLeader ( pnid );
+
+ TRACE_EXIT;
+}
+
+// Assign montior lead in the case of failure
+void CCluster::AssignMonitorLeader( int pnid )
+{
+ const char method_name[] = "CCluster::AssignMonitorLeader";
+ TRACE_ENTRY;
+
+ int i = 0;
+ int rc = 0;
+
+ int lMonitorLeaderPNid = MonitorLeaderPNid;
+ CNode *node = NULL;
+
+ if (MonitorLeaderPNid != pnid)
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST |
TRACE_SYNC | TRACE_TMSYNC))
+ {
+ trace_printf( "%s@%d" " - (MasterMonitor) returning, pnid %d !=
monitorLead %d\n"
+ , method_name, __LINE__, pnid, MonitorLeaderPNid );
+ }
+ return;
+ }
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST |
TRACE_SYNC | TRACE_TMSYNC))
+ {
+ trace_printf( "%s@%d" " - (MasterMonitor) Node " "%d" " MonitorLeader
failed!\n"
+ , method_name, __LINE__, MonitorLeaderPNid );
+ }
+
+ for (i=0; i<GetConfigPNodesMax(); i++)
+ {
+ lMonitorLeaderPNid++;
+
+ if (lMonitorLeaderPNid == GetConfigPNodesMax())
+ {
+ lMonitorLeaderPNid = 0; // restart with nid 0
+ }
+
+ if (lMonitorLeaderPNid == pnid)
+ {
+ continue; // this is the node that is going down, skip it
+ }
+
+ if (Node[lMonitorLeaderPNid] == NULL)
+ {
+ continue;
+ }
+
+ node = Node[lMonitorLeaderPNid];
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST |
TRACE_SYNC | TRACE_TMSYNC))
+ {
+ trace_printf( "%s@%d - Node pnid=%d (%s), phase=%s,
isSoftNodeDown=%d\n"
+ , method_name, __LINE__
+ , node->GetPNid()
+ , node->GetName()
+ , NodePhaseString(node->GetPhase())
+ , node->IsSoftNodeDown());
+ }
+
+ if ( node->IsSpareNode() ||
+ node->IsSoftNodeDown() ||
+ node->GetState() != State_Up ||
+ node->GetPhase() != Phase_Ready )
+ {
+ continue; // skip this node for any of the above reasons
+ }
+
+ MonitorLeaderPNid = node->GetPNid();
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST |
TRACE_SYNC | TRACE_TMSYNC))
+ {
+ trace_printf("%s@%d" " - Node " "%d" " is the new
MonitorLeaderPNid." "\n", method_name, __LINE__, MonitorLeaderPNid);
+ }
+
+ if (ZClientEnabled)
+ {
+ rc = ZClient->CreateMasterZNode ( node->GetName() );
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST
| TRACE_SYNC | TRACE_TMSYNC))
+ {
+ trace_printf("%s@%d" " (MasterMonitor) AssignMonitorLeader
CreateMasterZNode with rc = %d\n", method_name, __LINE__, rc);
+ }
+ if ( (rc == ZOK) || (rc == ZNODEEXISTS) )
+ {
+ if ( IsAgentMode )
+ {
+ rc = ZClient->WatchMasterNode( node->GetName( ) );
+ if ( trace_settings & (TRACE_INIT | TRACE_RECOVERY |
TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC) )
+ {
+ trace_printf( "%s@%d" " (MasterMonitor)
AssignMonitorLeader WatchMasterNode with rc = %d\n", method_name, __LINE__, rc
);
+ }
+ }
+ }
+ else
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY |
TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC))
+ {
+ trace_printf("%s@%d" " (MasterMonitor)
AssignMonitorLeader Unable to set create or set watch\n", method_name,
__LINE__);
+ }
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s], Unable to set create or set watch on
master node %s\n"
+ , method_name, node->GetName() );
+ mon_log_write(MON_ZCLIENT_CREATEORSETMASTERWATCH, SQ_LOG_ERR,
buf);
+ }
+ }
+
+ break;
+ }
+
+ TRACE_EXIT;
+}
+
// Assigns a new TMLeader if given pnid is same as TmLeaderNid
// TmLeader is a logical node num.
// pnid has gone down, so if that node was previously the TM leader, a new one
needs to be chosen.
@@ -494,6 +625,7 @@ CCluster::CCluster (void)
configPNodesMax_ (-1),
NodeMap (NULL),
TmLeaderNid (-1),
+ MonitorLeaderPNid (-1),
tmReadyCount_(0),
minRecvCount_(4096),
recvBuffer_(NULL),
@@ -529,6 +661,7 @@ CCluster::CCluster (void)
const char method_name[] = "CCluster::CCluster";
TRACE_ENTRY;
+ configMaster_ = -1;
MPI_Comm_set_errhandler(MPI_COMM_WORLD,MPI_ERRORS_RETURN);
char *env = getenv("SQ_MON_CHECK_SEQNUM");
@@ -548,6 +681,9 @@ CCluster::CCluster (void)
CClusterConfig *clusterConfig = Nodes->GetClusterConfig();
configPNodesMax_ = clusterConfig->GetPNodesConfigMax();
+ // get master from CClusterConfig
+ configMaster_ = clusterConfig->GetConfigMaster();
+
// Compute minimum "sync cycles" per second. The minimum is 1/10
// the expected number, assuming "next_test_delay" cycles per second (where
// next_test_delay is in microseconds).
@@ -640,6 +776,21 @@ CCluster::~CCluster (void)
const char method_name[] = "CCluster::~CCluster";
TRACE_ENTRY;
+ if (epollFD_ != -1)
+ {
+ close( epollFD_ );
+ }
+
+ if (commSock_ != -1)
+ {
+ close( commSock_ );
+ }
+
+ if (syncSock_ != -1)
+ {
+ close( syncSock_ );
+ }
+
delete [] comms_;
delete [] otherMonRank_;
delete [] socks_;
@@ -677,26 +828,26 @@ unsigned long long
CCluster::EnsureAndGetSeqNum(cluster_state_def_t nodestate[])
unsigned long long seqNum = 0;
- for (int i = 0; i < GetConfigPNodesMax(); i++)
+ for (int i = 0; i < GetConfigPNodesCount(); i++)
{
if (trace_settings & TRACE_RECOVERY)
{
- trace_printf("%s@%d nodestate[%d].seq_num=%lld, seqNum=%lld\n",
method_name, __LINE__, i, nodestate[i].seq_num, seqNum );
+ trace_printf("%s@%d nodestate[%d].seq_num=%lld, seqNum=%lld\n",
method_name, __LINE__, i, nodestate[indexToPnid_[i]].seq_num, seqNum );
}
- if (nodestate[i].seq_num > 1)
+ if (nodestate[indexToPnid_[i]].seq_num > 1)
{
if (seqNum == 0)
{
- seqNum = nodestate[i].seq_num;
+ seqNum = nodestate[indexToPnid_[i]].seq_num;
}
else
{
- assert(nodestate[i].seq_num == seqNum);
+ assert(nodestate[indexToPnid_[i]].seq_num == seqNum);
}
}
if (trace_settings & TRACE_RECOVERY)
{
- trace_printf("%s@%d nodestate[%d].seq_num=%lld, seqNum=%lld\n",
method_name, __LINE__, i, nodestate[i].seq_num, seqNum );
+ trace_printf("%s@%d nodestate[%d].seq_num=%lld, seqNum=%lld\n",
method_name, __LINE__, i, nodestate[indexToPnid_[i]].seq_num, seqNum );
}
}
@@ -857,6 +1008,7 @@ void CCluster::HardNodeDown (int pnid, bool
communicate_state)
if ( ZClientEnabled )
{
ZClient->WatchNodeDelete( node->GetName() );
+ ZClient->WatchNodeMasterDelete( node->GetName() );
}
}
}
@@ -875,7 +1027,7 @@ void CCluster::HardNodeDown (int pnid, bool
communicate_state)
if ( Emulate_Down )
{
IAmIntegrated = false;
- AssignTmLeader(pnid, false);
+ AssignLeaders(pnid, false);
}
TRACE_EXIT;
@@ -976,7 +1128,7 @@ void CCluster::SoftNodeDown( int pnid )
}
IAmIntegrated = false;
- AssignTmLeader(pnid, false);
+ AssignLeaders(pnid, false);
TRACE_EXIT;
}
@@ -1237,8 +1389,8 @@ int CCluster::HardNodeUp( int pnid, char *node_name )
TRACE_ENTRY;
if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
- trace_printf( "%s@%d - pnid=%d, name=%s\n"
- , method_name, __LINE__, pnid, node_name );
+ trace_printf( "%s@%d - pnid=%d, name=%s (MyPNID = %d)\n"
+ , method_name, __LINE__, pnid, node_name, MyPNID );
if ( pnid == -1 )
{
@@ -2252,7 +2404,7 @@ void CCluster::HandleOtherNodeMsg (struct
internal_msg_def *recv_msg,
{
case SyncType_TmData:
if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC))
- trace_printf("%s@%d - TMSYNC(TmData) on Node %s (pnid=%d)\n",
method_name, __LINE__, Node[pnid]->GetName(), pnid);
+ trace_printf("%s@%d - TMSYNC(TmData) on Node %s (pnid=%d),
(phase=%d)\n", method_name, __LINE__, Node[pnid]->GetName(), pnid,
MyNode->GetPhase());
if ( ! MyNode->IsSpareNode() && MyNode->GetPhase() != Phase_Ready )
{
MyNode->CheckActivationPhase();
@@ -2871,16 +3023,49 @@ void CCluster::InitializeConfigCluster( void )
InitServerSock();
}
- // The new monitor in a real cluster initializes all
- // existing nodes to a down state.
- // ReIntegrate() will set the state to up when communication is
established.
- if ( IAmIntegrating )
+ if (trace_settings & TRACE_INIT)
+ {
+ trace_printf( "%s@%d (MasterMonitor) IAmIntegrating=%d,"
+ " IsAgentMode=%d, IsMaster=%d,"
+ " MasterMonitorName=%s, Node_name=%s\n"
+ , method_name, __LINE__
+ , IAmIntegrating
+ , IsAgentMode, IsMaster, MasterMonitorName, Node_name );
+ }
+
+ if (IAmIntegrating || IsAgentMode)
{
+ int TmLeaderPNid = -1;
+ if (IsMaster)
+ {
+ TmLeaderNid = Nodes->GetFirstNid();
+ TmLeaderPNid = LNode[TmLeaderNid]->GetNode()->GetPNid();
+ }
+ // Non-master monitors in AGENT mode in a real cluster initialize all
+ // remote nodes to a down state. The master monitor and the joining
+ // monitors will set the joining node state to up as part of the node
+ // re-integration processing as monitor processes join the cluster
+ // through the master.
for (int i=0; i < clusterConfig->GetPNodesCount(); i++)
{
- if ( Node[indexToPnid_[i]] && Node[indexToPnid_[i]]->GetPNid() !=
MyPNID )
+ if (Node[indexToPnid_[i]])
{
- Node[indexToPnid_[i]]->SetState( State_Down );
+ if (Node[indexToPnid_[i]]->GetPNid() == MyPNID)
+ { // Set bit indicating node is up
+ upNodes_.upNodes[indexToPnid_[i]/MAX_NODE_BITMASK] |=
+ (1ull << (indexToPnid_[i]%MAX_NODE_BITMASK));
+ }
+ else
+ { // Set node state to down
+ Node[indexToPnid_[i]]->SetState( State_Down );
+ if (IsMaster)
+ {
+ if (TmLeaderPNid == indexToPnid_[i])
+ {
+ AssignTmLeader(indexToPnid_[i], false);
+ }
+ }
+ }
}
}
}
@@ -3060,6 +3245,23 @@ void CCluster::InitializeConfigCluster( void )
if (nodeNames) delete [] nodeNames;
}
+ if ( CommType == CommType_Sockets )
+ {
+ // Allgather() cluster sockets are established as remote
+ // monitor processes join the cluster
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ for ( int i =0; i < clusterConfig->GetPNodesCount() ; i++ )
+ {
+ trace_printf( "%s@%d %s (%d), state=%s, socks_[%d]=%d\n"
+ , method_name, __LINE__
+ , Node[indexToPnid_[i]]->GetName()
+ , Node[indexToPnid_[i]]->GetPNid()
+ , StateString(Node[indexToPnid_[i]]->GetState())
+ , indexToPnid_[i], socks_[indexToPnid_[i]]);
+ }
+ }
+ }
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
{
for ( int i =0; i < MAX_NODE_MASKS ; i++ )
@@ -3072,7 +3274,10 @@ void CCluster::InitializeConfigCluster( void )
// Kill the MPICH hydra_pmi_proxy to prevent it from killing all
// processes in cluster when mpirun or monitor processes are killed
- kill( getppid(), SIGKILL );
+ if (!IsAgentMode || (IsAgentMode && IsMPIChild))
+ {
+ kill( getppid(), SIGKILL );
+ }
TRACE_EXIT;
}
@@ -3807,10 +4012,31 @@ void CCluster::ReIntegrateSock( int initProblem )
TEST_POINT( TP010_NODE_UP );
// Connect with my creator monitor
- joinSock_ = Monitor->Connect( IntegratingMonitorPort );
- if ( joinSock_ < 0 )
+ bool lv_done = false;
+ bool lv_did_not_connect_in_first_attempt = false;
+ while ( ! lv_done )
{
- HandleReintegrateError( joinSock_, Reintegrate_Err1, -1, NULL, true );
+ joinSock_ = Monitor->Connect( IntegratingMonitorPort );
+ if ( joinSock_ < 0 )
+ {
+ if ( IsAgentMode )
+ {
+ lv_did_not_connect_in_first_attempt = true;
+ sleep( 15 );
+ }
+ else
+ {
+ HandleReintegrateError( joinSock_, Reintegrate_Err1, -1, NULL,
true );
+ }
+ }
+ else
+ {
+ if ( lv_did_not_connect_in_first_attempt )
+ {
+ sleep( 10 );
+ }
+ lv_done = true;
+ }
}
mem_log_write(CMonLog::MON_REINTEGRATE_4, MyPNID);
@@ -4281,8 +4507,6 @@ void CCluster::SetIntegratingPNid( int pnid )
TRACE_ENTRY;
integratingPNid_ = pnid;
- // Indicate to the commAcceptor thread to stop accepting connections
- CommAccept.stopAccepting();
TRACE_EXIT;
}
@@ -6181,8 +6405,8 @@ void CCluster::HandleDownNode( int pnid )
if (trace_settings & TRACE_INIT)
trace_printf("%s@%d - Added down node to list, pnid=%d, name=(%s)\n",
method_name, __LINE__, downNode->GetPNid(), downNode->GetName());
- // assign new TmLeader if TMLeader node is dead.
- AssignTmLeader(pnid, false);
+ // assign new leaders if needed
+ AssignLeaders(pnid, false);
// Build available list of spare nodes
CNode *spareNode;
@@ -8276,6 +8500,19 @@ int CCluster::MkSrvSock( int *pport )
return ( -1 );
}
+ int reuse = 1; // sockopt reuse option
+ if ( setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, (char *) &reuse,
sizeof(int) ) )
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ int err = errno;
+ sprintf( la_buf, "[%s], setsockopt(SO_REUSEADDR) failed! errno=%d
(%s)\n"
+ , method_name, err, strerror( err ));
+ mon_log_write(MON_CLUSTER_MKSRVSOCK_4, SQ_LOG_ERR, la_buf);
+ close( sock );
+ return ( -1 );
+ }
+
+
// Bind socket.
size = sizeof(sockinfo);
memset( (char *) &sockinfo, 0, size );
http://git-wip-us.apache.org/repos/asf/trafodion/blob/bded0e84/core/sqf/monitor/linux/cluster.h
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/cluster.h b/core/sqf/monitor/linux/cluster.h
index 58d3540..6b658ae 100644
--- a/core/sqf/monitor/linux/cluster.h
+++ b/core/sqf/monitor/linux/cluster.h
@@ -113,7 +113,9 @@ public:
#ifndef USE_BARRIER
void ArmWakeUpSignal (void);
#endif
+ void AssignLeaders( int pnid, bool checkProcess );
void AssignTmLeader( int pnid, bool checkProcess );
+ void AssignMonitorLeader( int pnid );
void stats();
void CompleteSyncCycle()
{ syncCycle_.lock(); syncCycle_.wait(); syncCycle_.unlock(); }
@@ -124,6 +126,8 @@ public:
void ExpediteDown( void );
inline int GetTmLeader( void ) { return( TmLeaderNid); }
inline void SetTmLeader( int tmLeaderNid ) { TmLeaderNid = tmLeaderNid; }
+ inline int GetMonitorLeader( void ) { return( MonitorLeaderPNid); }
+ inline void SetMonitorLeader( int monitorLeaderPNid ) { MonitorLeaderPNid
= monitorLeaderPNid; }
int GetDownedNid( void );
inline int GetTmSyncPNid( void ) { return( TmSyncPNid ); } // Physical
Node ID of current TmSync operations master
void InitClusterComm(int worldSize, int myRank, int *rankToPnid);
@@ -177,6 +181,7 @@ public:
bool ReinitializeConfigCluster( bool nodeAdded, int pnid );
int incrGetVerifierNum();
+ int getConfigMaster() { return configMaster_; }
enum { SYNC_MAX_RESPONSIVE = 1 }; // Max seconds before sync thread is
"stuck"
@@ -201,6 +206,7 @@ protected:
int syncSock_;
int epollFD_;
int *indexToPnid_;
+ int configMaster_;
CNode **Node; // array of nodes
CLNode **LNode; // array of logical nodes
@@ -229,6 +235,7 @@ private:
int configPNodesMax_; // max # of physical nodes that can be
configured
int *NodeMap; // Mapping of Node ranks to COMM_WORLD ranks
int TmLeaderNid; // Nid of currently assigned TM Leader node
+ int MonitorLeaderPNid; // PNid of currently assigned Monitor leader
node
int tmReadyCount_; // # of DTM processes ready for transactions
size_t minRecvCount_; // minimum size of receive buffer for allgather
http://git-wip-us.apache.org/repos/asf/trafodion/blob/bded0e84/core/sqf/monitor/linux/commaccept.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/commaccept.cxx
b/core/sqf/monitor/linux/commaccept.cxx
index 21b30a6..11c12d7 100644
--- a/core/sqf/monitor/linux/commaccept.cxx
+++ b/core/sqf/monitor/linux/commaccept.cxx
@@ -556,6 +556,25 @@ void CCommAccept::processNewSock( int joinFd )
node= Nodes->GetNode( nodeId.nodeName );
+ if ( node == NULL )
+ {
+ close( joinFd );
+
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s], got connection from unknown "
+ "node %d (%s). Ignoring it.\n"
+ , method_name
+ , nodeId.pnid
+ , nodeId.nodeName);
+ mon_log_write(MON_COMMACCEPT_9, SQ_LOG_ERR, buf);
+
+ // Requests is complete, begin accepting connections again
+ CommAccept.startAccepting();
+
+ return;
+ }
+
if ( nodeId.ping )
{
// Reply with my node info
@@ -595,6 +614,10 @@ void CCommAccept::processNewSock( int joinFd )
, method_name, node?node->GetName():"", ErrorMsg(rc));
mon_log_write(MON_COMMACCEPT_19, SQ_LOG_ERR, buf);
}
+
+ // Requests is complete, begin accepting connections again
+ CommAccept.startAccepting();
+
return;
}
@@ -607,53 +630,6 @@ void CCommAccept::processNewSock( int joinFd )
, nodeId.creatorShellVerifier );
}
- int pnid = -1;
- if ( node != NULL )
- { // Store port numbers for the node
- char commPort[MPI_MAX_PORT_NAME];
- char syncPort[MPI_MAX_PORT_NAME];
- strncpy(commPort, nodeId.commPort, MPI_MAX_PORT_NAME);
- strncpy(syncPort, nodeId.syncPort, MPI_MAX_PORT_NAME);
- char *pch1;
- char *pch2;
- pnid = nodeId.pnid;
-
- node->SetCommPort( commPort );
- pch1 = strtok (commPort,":");
- pch1 = strtok (NULL,":");
- node->SetCommSocketPort( atoi(pch1) );
-
- node->SetSyncPort( syncPort );
- pch2 = strtok (syncPort,":");
- pch2 = strtok (NULL,":");
- node->SetSyncSocketPort( atoi(pch2) );
-
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- {
- trace_printf( "%s@%d - Setting node %d (%s), commPort=%s(%d),
syncPort=%s(%d)\n"
- , method_name, __LINE__
- , node->GetPNid()
- , node->GetName()
- , pch1, atoi(pch1)
- , pch2, atoi(pch2) );
- }
- }
- else
- {
- close( joinFd );
-
- char buf[MON_STRING_BUF_SIZE];
- snprintf( buf, sizeof(buf)
- , "[%s], got connection from unknown "
- "node %d (%s). Ignoring it.\n"
- , method_name
- , nodeId.pnid
- , nodeId.nodeName);
- mon_log_write(MON_COMMACCEPT_9, SQ_LOG_ERR, buf);
-
- return;
- }
-
// Sanity check, re-integrating node must be down
if ( node->GetState() != State_Down )
{
@@ -672,9 +648,43 @@ void CCommAccept::processNewSock( int joinFd )
, StateString(node->GetState()));
mon_log_write(MON_COMMACCEPT_10, SQ_LOG_ERR, buf);
+ // Requests is complete, begin accepting connections again
+ CommAccept.startAccepting();
+
return;
}
+ int pnid = -1;
+
+ // Store port numbers for the node
+ char commPort[MPI_MAX_PORT_NAME];
+ char syncPort[MPI_MAX_PORT_NAME];
+ strncpy(commPort, nodeId.commPort, MPI_MAX_PORT_NAME);
+ strncpy(syncPort, nodeId.syncPort, MPI_MAX_PORT_NAME);
+ char *pch1;
+ char *pch2;
+ pnid = nodeId.pnid;
+
+ node->SetCommPort( commPort );
+ pch1 = strtok (commPort,":");
+ pch1 = strtok (NULL,":");
+ node->SetCommSocketPort( atoi(pch1) );
+
+ node->SetSyncPort( syncPort );
+ pch2 = strtok (syncPort,":");
+ pch2 = strtok (NULL,":");
+ node->SetSyncSocketPort( atoi(pch2) );
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Setting node %d (%s), commPort=%s(%d),
syncPort=%s(%d)\n"
+ , method_name, __LINE__
+ , node->GetPNid()
+ , node->GetName()
+ , pch1, atoi(pch1)
+ , pch2, atoi(pch2) );
+ }
+
mem_log_write(CMonLog::MON_CONNTONEWMON_4, pnid);
if ( MyNode->IsCreator() )
@@ -916,6 +926,8 @@ void CCommAccept::commAcceptorIB()
interComm = MPI_COMM_NULL;
rc = MPI_Comm_accept( MyCommPort, MPI_INFO_NULL, 0, MPI_COMM_SELF,
&interComm );
+ // Stop accepting connections until this request completes
+ CommAccept.stopAccepting();
}
else
{
@@ -988,6 +1000,8 @@ void CCommAccept::commAcceptorSock()
mem_log_write(CMonLog::MON_CONNTONEWMON_1);
joinFd = Monitor->AcceptCommSock();
+ // Stop accepting connections until this request completes
+ CommAccept.stopAccepting();
}
else
{
http://git-wip-us.apache.org/repos/asf/trafodion/blob/bded0e84/core/sqf/monitor/linux/mlio.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/mlio.cxx b/core/sqf/monitor/linux/mlio.cxx
index 61803f8..7db35ec 100644
--- a/core/sqf/monitor/linux/mlio.cxx
+++ b/core/sqf/monitor/linux/mlio.cxx
@@ -1261,7 +1261,13 @@ SQ_LocalIOToClient::SQ_LocalIOToClient(int nid)
if (cmid == -1)
{
if (trace_settings & TRACE_INIT)
- trace_printf("%s@%d" " failed shmget(" "%d" "), errno=" "%d" "\n",
method_name, __LINE__, (shsize), errno);
+ {
+ int err = errno;
+ char la_buf[MON_STRING_BUF_SIZE];
+ trace_printf( "%s@%d" " failed shmget(%d), errno=%d (%s)\n"
+ , method_name, __LINE__
+ , (shsize), err, strerror(err) );
+ }
if ( errno == EEXIST)
{
// and try getting it with a smaller size
http://git-wip-us.apache.org/repos/asf/trafodion/blob/bded0e84/core/sqf/monitor/linux/monitor.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/monitor.cxx
b/core/sqf/monitor/linux/monitor.cxx
index 70df7cc..124b1ff 100755
--- a/core/sqf/monitor/linux/monitor.cxx
+++ b/core/sqf/monitor/linux/monitor.cxx
@@ -53,6 +53,7 @@ using namespace std;
#include "tmsync.h"
#include "cluster.h"
#include "monitor.h"
+#include "props.h"
#ifdef DMALLOC
#include "dm.h"
@@ -99,12 +100,16 @@ char MySyncPort[MPI_MAX_PORT_NAME] = {'\0'};
char Node_name[MPI_MAX_PROCESSOR_NAME] = {'\0'};
sigset_t SigSet;
bool Emulate_Down = false;
-long next_test_delay = 10000; // in usec.
-
+long next_test_delay = 100000; // in usec. (default 100 msec)
+CClusterConfig *ClusterConfig = NULL;
bool IAmIntegrating = false;
bool IAmIntegrated = false;
char IntegratingMonitorPort[MPI_MAX_PORT_NAME] = {'\0'};
bool IsRealCluster = true;
+bool IsAgentMode = false;
+bool IsMaster = false;
+bool IsMPIChild = false;
+char MasterMonitorName[MAX_PROCESS_PATH]= {'\0'};
CommType_t CommType = CommType_Undefined;
bool SMSIntegrating = false;
int CreatorShellPid = -1;
@@ -865,9 +870,9 @@ void HandleNodeExpiration( const char *nodeName )
TRACE_EXIT;
}
-void CMonitor::CreateZookeeperClient( void )
+void CreateZookeeperClient( void )
{
- const char method_name[] = "CMonitor::CreateZookeeperClient";
+ const char method_name[] = "CreateZookeeperClient";
TRACE_ENTRY;
if ( ZClientEnabled )
@@ -961,9 +966,9 @@ void CMonitor::CreateZookeeperClient( void )
TRACE_EXIT;
}
-void CMonitor::StartZookeeperClient( void )
+void StartZookeeperClient( void )
{
- const char method_name[] = "CMonitor::StartZookeeperClient";
+ const char method_name[] = "StartZookeeperClient";
TRACE_ENTRY;
int rc = -1;
@@ -1043,19 +1048,71 @@ int main (int argc, char *argv[])
char temp_fname[MAX_PROCESS_PATH];
char buf[MON_STRING_BUF_SIZE];
unsigned int initSleepTime = 1; // 1 second
+
mallopt(M_ARENA_MAX, 4); // call to limit the number of arena's of
monitor to 4.This call doesn't seem to have any effect !
CALL_COMP_DOVERS(monitor, argc, argv);
const char method_name[] = "main";
+ if (argc < 2) {
+ printf("error: monitor needs an argument...exitting...\n");
+ exit(0);
+ }
+
+ int lv_arg_index = 1;
+ while ( lv_arg_index < argc )
+ {
+ // Installations like Cloudera Manager, the monitor is started in
AGENT mode
+ if ( strcmp( argv[lv_arg_index], "COLD_AGENT" ) == 0 )
+ {
+ IsAgentMode = true;
+ }
+
+ lv_arg_index++;
+ }
+
// Set flag to indicate whether we are operating in a real cluster
// or a virtual cluster. This is used throughout the monitor when
// behavior differs for a real vs. virtual cluster environment.
- if ( getenv("SQ_VIRTUAL_NODES") )
+ if ( !IsAgentMode )
{
- IsRealCluster = false;
- Emulate_Down = true;
+ if ( getenv( "SQ_VIRTUAL_NODES" ) )
+ {
+ IsRealCluster = false;
+ Emulate_Down = true;
+ }
+ if (IsRealCluster)
+ {
+ // The monitor processes may be started by MPIrun utility
+ env = getenv("SQ_MON_CREATOR");
+ if ( env != NULL && strcmp(env, "MPIRUN") == 0 )
+ {
+ IsMPIChild = true;
+ }
+ // The monitor can be set to run in AGENT mode
+ env = getenv("SQ_MON_RUN_MODE");
+ if ( env != NULL && strcmp(env, "AGENT") == 0 )
+ {
+ IsAgentMode = true;
+ }
+ }
+ }
+
+ if ( IsAgentMode )
+ {
+ MON_Props xprops( true );
+ xprops.load( "monitor.env" );
+ MON_Smap_Enum xenum( &xprops );
+ while ( xenum.more( ) )
+ {
+ char *xkey = xenum.next( );
+ const char *xvalue = xprops.get( xkey );
+ if ( xkey && xkey[0] && xvalue )
+ {
+ setenv( xkey, xvalue, 1 );
+ }
+ }
}
MonLog = new CMonLog( "log4cxx.monitor.mon.config", "MON", "alt.mon", -1,
-1, getpid(), "$MONITOR" );
@@ -1240,7 +1297,7 @@ int main (int argc, char *argv[])
abort();
}
- if (argc > 3 && strcmp (argv[2], "-integrate") == 0)
+ if ((!IsAgentMode) && (argc > 3 && strcmp (argv[2], "-integrate") == 0))
{
switch( CommType )
{
@@ -1257,13 +1314,13 @@ int main (int argc, char *argv[])
}
break;
case CommType_Sockets:
- if ( isdigit (*argv[3]) )
+ if ( IsAgentMode || isdigit (*argv[3]) )
{
// In agent mode and when re-integrating (node up), all
// monitors processes start as a cluster of 1 and join to
the
// creator monitor to establish the real cluster.
- // Therefore, MyPNID will always be zero when in and
- // it is necessary to use the node name to obtain the
correct
+ // Therefore, MyPNID will always be zero them it is
+ // necessary to use the node name to obtain the correct
// <pnid> from the configuration which occurs when
creating the
// CMonitor object down below. By setting MyPNID to -1,
when the
// CCluster::InitializeConfigCluster() invoked during the
creation
@@ -1306,8 +1363,15 @@ int main (int argc, char *argv[])
// Trace cannot be specified on startup command but need to
// check for trace environment variable settings.
MonTrace->mon_trace_init("0", NULL);
+
+ }
+
+ if (IsAgentMode)
+ {
+ CreatorShellPid = 1000; // per monitor.sh
+ CreatorShellVerifier = 0;
}
- else
+
if (argc == 3 && isdigit(*argv[2]) )
{
MonTrace->mon_trace_init(argv[2], "STDOUT");
@@ -1398,8 +1462,12 @@ int main (int argc, char *argv[])
MonStats->MonitorBusyIncr();
snprintf(buf, sizeof(buf),
- "[CMonitor::main], %s, Started! CommType: %s\n"
- , CALL_COMP_GETVERS2(monitor), CommTypeString( CommType ));
+ "[CMonitor::main], %s, Started! CommType: %s (%s%s%s)\n"
+ , CALL_COMP_GETVERS2(monitor)
+ , CommTypeString( CommType )
+ , IsRealCluster?"RealCluster":"VirtualCluster"
+ , IsAgentMode?"/AgentMode":""
+ , IsMPIChild?"/MPIChild":"" );
mon_log_write(MON_MONITOR_MAIN_3, SQ_LOG_INFO, buf);
#ifdef DMALLOC
@@ -1420,11 +1488,230 @@ int main (int argc, char *argv[])
// Create thread for monitoring redirected i/o.
// This is also used for monitor logs, so start it early.
Redirector.start();
+
+ // Create global configuration now
+ ClusterConfig = new CClusterConfig();
+ if (ClusterConfig)
+ {
+ bool traceEnabled = (trace_settings & TRACE_TRAFCONFIG) ? true :
false;
+ if (ClusterConfig->Initialize( traceEnabled,
MonTrace->getTraceFileName()))
+ {
+ if (!ClusterConfig->LoadConfig())
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ sprintf(la_buf, "[%s], Failed to load cluster
configuration.\n", method_name);
+ mon_log_write(MON_MONITOR_MAIN_12, SQ_LOG_CRIT, la_buf);
+
+ abort();
+ }
+ }
+ else
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ sprintf(la_buf, "[%s], Failed to open cluster
configuration.\n", method_name);
+ mon_log_write(MON_MONITOR_MAIN_13, SQ_LOG_CRIT, la_buf);
+
+ abort();
+ }
+ }
+ else
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ sprintf(la_buf, "[%s], Failed to allocate cluster
configuration.\n", method_name);
+ mon_log_write(MON_MONITOR_MAIN_14, SQ_LOG_CRIT, la_buf);
+
+ abort();
+ }
+
+ // Set up zookeeper and determine the master
+ if ( IsAgentMode || IsRealCluster )
+ {
+ // Zookeeper client is enabled only in a real cluster
+ env = getenv("SQ_MON_ZCLIENT_ENABLED");
+
+ if ( env )
+ {
+ if ( env && isdigit(*env) )
+ {
+ if ( strcmp(env,"0")==0 )
+ {
+ ZClientEnabled = false;
+ }
+ }
+ }
+
+ if ( ZClientEnabled )
+ {
+ CreateZookeeperClient( );
+ }
+ }
+ else
+ {
+ ZClientEnabled = false;
+ }
+
+ if (IsAgentMode)
+ {
+ if ((ZClientEnabled) && (ZClient != NULL))
+ {
+ // Do not wait, just see if one exists
+ const char *masterMonitor =
ZClient->WaitForAndReturnMaster(false);
- // CNodeContainer loads static configuration from database
- Nodes = new CNodeContainer ();
+ if (masterMonitor)
+ {
+ strcpy (MasterMonitorName, masterMonitor);
+ // unfortunately, we have to do this to see if we are the
master before
+ // other things are set up. This is how we must do that
+ if (strcmp(Node_name, masterMonitor) == 0)
+ {
+ IsMaster = true;
+ }
+ else
+ {
+ IsMaster = false;
+ }
+ }
+ else
+ {
+ strcpy (MasterMonitorName,
ClusterConfig->GetConfigMasterByName());
+ if (strcmp (Node_name,
ClusterConfig->GetConfigMasterByName()) == 0)
+ {
+ IsMaster = true;
+ }
+ else
+ {
+ IsMaster = false;
+ }
+ }
+
+ }
+ }
+
+ if (IsAgentMode)
+ {
+ if (!IsMaster)
+ {
+ MyPNID=-1;
+ SMSIntegrating = IAmIntegrating = true;
+ char *monitorPort = getenv ("MONITOR_COMM_PORT");
+ if (monitorPort)
+ {
+ strcpy( IntegratingMonitorPort, MasterMonitorName);
+ strcat( IntegratingMonitorPort, ":");
+ strcat( IntegratingMonitorPort, monitorPort);
+ }
+ if (trace_settings & TRACE_INIT)
+ {
+ trace_printf( "%s@%d (MasterMonitor) IsAgentMode = TRUE, I
am NOT the master, "
+ "MyPNID=%d, master port=%s\n"
+ , method_name, __LINE__
+ , MyPNID, IntegratingMonitorPort );
+ }
+ }
+ else
+ {
+ if (trace_settings & TRACE_INIT)
+ {
+ trace_printf( "%s@%d (MasterMonitor) IsAgentMode = TRUE, I
am the master, MyPNID=%d\n"
+ , method_name, __LINE__, MyPNID );
+ }
+ IAmIntegrating = false;
+ }
+ }
+ Nodes = new CNodeContainer ();
Config = new CConfigContainer ();
- Monitor = new CMonitor (procTermSig);
+ Monitor = new CMonitor (procTermSig);
+
+ if ( IsAgentMode )
+ {
+ if (trace_settings & TRACE_INIT)
+ {
+ trace_printf( "%s@%d MyPNID=%d\n"
+ , method_name, __LINE__, MyPNID );
+ }
+ MonLog->setPNid( MyPNID );
+ }
+
+ if (IsAgentMode)
+ {
+ CNode *myNode = Nodes->GetNode(MyPNID);
+ const char *masterMonitor=NULL;
+ if (myNode == NULL)
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ sprintf( la_buf
+ , "[%s], Failed to get my Node, MyPNID=%d\n"
+ , method_name, MyPNID );
+ mon_log_write(MON_MONITOR_MAIN_15, SQ_LOG_CRIT, la_buf);
+
+ abort();
+ }
+
+ if ((ZClientEnabled) && (ZClient != NULL))
+ {
+ CNode *masterNode = Nodes->GetNode(MasterMonitorName);
+ if (!masterNode)
+ {
+ if (trace_settings & TRACE_INIT)
+ {
+ trace_printf("%s@%d (MasterMonitor) IsMaster == %d,
masterNode is NULL, with MasterMonitorName %s\n", method_name, __LINE__,
IsMaster, MasterMonitorName);
+ }
+ char la_buf[MON_STRING_BUF_SIZE];
+ sprintf(la_buf, "[%s], Failed to get my Master Node.\n",
method_name);
+ mon_log_write(MON_MONITOR_MAIN_16, SQ_LOG_CRIT, la_buf);
+
+ abort();
+ }
+ else
+ {
+ if (trace_settings & TRACE_INIT)
+ {
+ trace_printf("%s@%d (MasterMonitor) IsMaster == %d,
masterNode=%s\n", method_name, __LINE__, IsMaster, masterNode->GetName() );
+ }
+ }
+ Monitor->SetMonitorLeader( masterNode->GetPNid() );
+ if (MyPNID == masterNode->GetPNid())
+ {
+ ZClient->CreateMasterZNode ( myNode->GetName() );
+ strcpy (MasterMonitorName, myNode->GetName());
+ if (trace_settings & TRACE_INIT)
+ {
+ trace_printf("%s@%d (MasterMonitor) IsMaster == %d,
set monitor lead to %d\n", method_name, __LINE__, IsMaster, MyPNID);
+ }
+ }
+ else
+ {
+ masterMonitor = ZClient->WaitForAndReturnMaster(true);
+ CNode *masterNode = NULL;
+ if (masterMonitor)
+ {
+ strcpy (MasterMonitorName, masterMonitor);
+ masterNode = Nodes->GetNode(MasterMonitorName);
+ }
+
+ if (masterNode)
+ {
+ if (trace_settings & TRACE_INIT)
+ {
+ trace_printf("%s@%d (MasterMonitor) IsMaster ==
%d, set monitor lead to %d\n", method_name, __LINE__, IsMaster,
masterNode->GetPNid());
+ }
+ Monitor->SetMonitorLeader( masterNode->GetPNid() );
+ }
+ else
+ {
+ if (trace_settings & TRACE_INIT)
+ {
+ trace_printf("%s@%d (MasterMonitor) IsMaster ==
%d, masterNode is NULL, with MasterMonitorName %s\n", method_name, __LINE__,
IsMaster, MasterMonitorName);
+ }
+ char la_buf[MON_STRING_BUF_SIZE];
+ sprintf(la_buf, "[%s], Failed to get my Master
Node.\n", method_name);
+ mon_log_write(MON_MONITOR_MAIN_17, SQ_LOG_CRIT,
la_buf);
+
+ abort();
+ }
+ }
+ }
+ }
if (!IAmIntegrating)
{
Config->Init ();
@@ -1493,7 +1780,6 @@ int main (int argc, char *argv[])
{
strcpy (Node_name, myNode->GetName());
}
-
// create with no caching, user read/write, group read/write, other
read
fd = open( port_fname
, O_RDWR | O_TRUNC | O_CREAT | O_DIRECT
@@ -1539,7 +1825,6 @@ int main (int argc, char *argv[])
MPI_Abort(MPI_COMM_SELF,99);
}
free( ioBuffer );
-
int ret = SQ_theLocalIOToClient->initWorker();
if (ret)
{
@@ -1566,33 +1851,7 @@ int main (int argc, char *argv[])
printf("%s@%d" " RLIMIT_SIGPENDING cur=%d, max=%d\n",
method_name, __LINE__, (int)Rl.rlim_cur, (int)Rl.rlim_max);
}
}
-
- if ( IsRealCluster )
- {
- // Zookeeper client is enabled only in a real cluster
- env = getenv("SQ_MON_ZCLIENT_ENABLED");
- if ( env )
- {
- if ( env && isdigit(*env) )
- {
- if ( strcmp(env,"0")==0 )
- {
- ZClientEnabled = false;
- }
- }
- }
-
- if ( ZClientEnabled )
- {
- Monitor->CreateZookeeperClient();
- }
- }
- else
- {
- ZClientEnabled = false;
- }
-
- if ( IAmIntegrating )
+ if ( IAmIntegrating )
{
// This monitor is integrating to (joining) an existing cluster
Monitor->ReIntegrate( 0 );
@@ -1602,7 +1861,7 @@ int main (int argc, char *argv[])
trace_printf("%s@%d" " After UpdateCluster" "\n", method_name,
__LINE__);
}
else
- {
+ {
Monitor->EnterSyncCycle();
done = Monitor->exchangeNodeData();
Monitor->ExitSyncCycle();
@@ -1618,7 +1877,18 @@ int main (int argc, char *argv[])
{
if ( ZClientEnabled )
{
- Monitor->StartZookeeperClient();
+ {
+ StartZookeeperClient();
+ // Set watch for master
+ if (IsAgentMode)
+ {
+ ZClient->WatchMasterNode( MasterMonitorName );
+ }
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d (MasterMonitor) set watch for
MasterMonitorName %s\n", method_name, __LINE__, MasterMonitorName );
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/trafodion/blob/bded0e84/core/sqf/monitor/linux/monitor.h
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/monitor.h b/core/sqf/monitor/linux/monitor.h
index 1b44c57..49308b9 100644
--- a/core/sqf/monitor/linux/monitor.h
+++ b/core/sqf/monitor/linux/monitor.h
@@ -63,7 +63,6 @@ public:
~CMonitor( void );
bool CompleteProcessStartup( struct message_def *msg );
- void CreateZookeeperClient( void );
void IncOpenCount(void);
void IncNoticeCount(void);
void IncProcessCount(void);
@@ -71,7 +70,6 @@ public:
void DecrNoticeCount(void);
void DecrProcessCount(void);
void StartPrimitiveProcesses( void );
- void StartZookeeperClient( void );
void openProcessMap ( void );
void writeProcessMapEntry ( const char * buf );
void writeProcessMapBegin( const char *name
http://git-wip-us.apache.org/repos/asf/trafodion/blob/bded0e84/core/sqf/monitor/linux/pnode.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/pnode.cxx b/core/sqf/monitor/linux/pnode.cxx
index 75c2137..485d013 100644
--- a/core/sqf/monitor/linux/pnode.cxx
+++ b/core/sqf/monitor/linux/pnode.cxx
@@ -49,6 +49,8 @@ using namespace std;
#include "replicate.h"
#include "reqqueue.h"
+#include "healthcheck.h"
+
extern CReqQueue ReqQueue;
extern char MyPath[MAX_PROCESS_PATH];
extern int MyPNID;
@@ -64,9 +66,13 @@ extern CNode *MyNode;
extern CMonStats *MonStats;
extern CRedirector Redirector;
extern CReplicate Replicator;
+extern CHealthCheck HealthCheck;
extern CMonTrace *MonTrace;
-
+extern bool IsAgentMode;
extern bool IAmIntegrating;
+extern char MasterMonitorName[MAX_PROCESS_PATH];
+extern char Node_name[MPI_MAX_PROCESSOR_NAME];
+extern CClusterConfig *ClusterConfig;
const char *StateString( STATE state);
const char *SyncStateString( SyncState state);
@@ -464,13 +470,14 @@ void CNode::CheckActivationPhase( void )
int tmCount = 0;
CLNode *lnode;
CProcess *process;
- bool tmReady;
+ bool tmReady = false;
const char method_name[] = "CNode::CheckActivationPhase";
TRACE_ENTRY;
// check for a TM process in each lnode
lnode = GetFirstLNode();
+
tmReady = lnode ? true : false;
for ( ; lnode ; lnode = lnode->GetNextP() )
{
@@ -1701,8 +1708,11 @@ void CNodeContainer::AddNodes( )
}
else
{
- if (pnid >= maxNode) // only for workstation acting as single node
- rank = -1;
+ if (pnid >= maxNode) // only for workstation acting as single node
+// || (IsAgentMode &&(strcmp( MasterMonitorName, Node_name ) !=
0)))
+ {
+ rank = -1; // -1 creates node in down state
+ }
node = new CNode( (char *)pnodeConfig->GetName(), pnid, rank );
assert( node != NULL );
}
@@ -3134,7 +3144,7 @@ void CNodeContainer::SetupCluster( CNode ***pnode_list,
CLNode ***lnode_list, in
if ( node->GetState() == State_Up && node->IsSpareNode() )
{
spareNodesConfigList_.push_back( node );
- if ( IAmIntegrating )
+ if (IAmIntegrating)
{
// do nothing. spareNodesList will get populated in the
join phase.
}
@@ -3166,40 +3176,11 @@ void CNodeContainer::LoadConfig( void )
const char method_name[] = "CNodeContainer::LoadConfig";
TRACE_ENTRY;
+ // The configuration is now global. To minimize impact for the time
being, just set the local
+ // pointer to the global configuration
if ( !clusterConfig_ )
{
- clusterConfig_ = new CClusterConfig();
- }
- if ( clusterConfig_ )
- {
- bool traceEnabled = (trace_settings & TRACE_TRAFCONFIG) ? true : false;
- if ( clusterConfig_->Initialize( traceEnabled,
MonTrace->getTraceFileName() ) )
- {
- if ( ! clusterConfig_->LoadConfig() )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- sprintf(la_buf, "[%s], Failed to load cluster
configuration.\n", method_name);
- mon_log_write(MON_NODECONT_LOAD_CONFIG_1, SQ_LOG_CRIT, la_buf);
-
- abort();
- }
- }
- else
- {
- char la_buf[MON_STRING_BUF_SIZE];
- sprintf(la_buf, "[%s], Failed to open cluster configuration.\n",
method_name);
- mon_log_write(MON_NODECONT_LOAD_CONFIG_2, SQ_LOG_CRIT, la_buf);
-
- abort();
- }
- }
- else
- {
- char la_buf[MON_STRING_BUF_SIZE];
- sprintf(la_buf, "[%s], Failed to allocate cluster configuration.\n",
method_name);
- mon_log_write(MON_NODECONT_LOAD_CONFIG_3, SQ_LOG_CRIT, la_buf);
-
- abort();
+ clusterConfig_ = ClusterConfig;
}
TRACE_EXIT;
http://git-wip-us.apache.org/repos/asf/trafodion/blob/bded0e84/core/sqf/monitor/linux/process.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/process.cxx
b/core/sqf/monitor/linux/process.cxx
index 6a8e08b..bce018b 100644
--- a/core/sqf/monitor/linux/process.cxx
+++ b/core/sqf/monitor/linux/process.cxx
@@ -72,6 +72,9 @@ extern CReqQueue ReqQueue;
#include "replicate.h"
+extern bool IsAgentMode;
+extern bool IsMaster;
+
extern bool PidMap;
extern int Measure;
extern int trace_level;
@@ -1651,13 +1654,39 @@ bool CProcess::Create (CProcess *parent, int & result)
}
string LDpath;
- if ( ldpathStrId_.nid != -1 )
- Config->strIdToString(ldpathStrId_, LDpath);
- if ( !LDpath.empty() )
+ static bool sv_getenv_ld_library_path_done = false;
+ static string sv_ld_library_path;
+ if (IsAgentMode)
{
- setEnvStrVal ( childEnv, nextEnv, "LD_LIBRARY_PATH", LDpath.c_str() );
+ if (! sv_getenv_ld_library_path_done)
+ {
+ sv_getenv_ld_library_path_done = true;
+ sv_ld_library_path = getenv( "LD_LIBRARY_PATH" );
+ if (trace_settings & (TRACE_SYNC_DETAIL | TRACE_REQUEST_DETAIL |
TRACE_PROCESS_DETAIL))
+ {
+ trace_printf( "%s@%d" " - LD_LIBRARY_PATH = " "%s" "\n",
method_name, __LINE__, sv_ld_library_path.c_str() );
+ }
+ }
+ LDpath = sv_ld_library_path;
if (trace_settings & (TRACE_SYNC_DETAIL | TRACE_REQUEST_DETAIL |
TRACE_PROCESS_DETAIL))
- trace_printf("%s@%d - LD_LIBRARY_PATH = %s\n", method_name,
__LINE__, LDpath.c_str());
+ {
+ trace_printf( "%s@%d" " - LD_LIBRARY_PATH = " "%s" "\n",
method_name, __LINE__, LDpath.c_str() );
+ }
+ }
+ else
+ {
+ if (ldpathStrId_.nid != -1)
+ {
+ Config->strIdToString( ldpathStrId_, LDpath );
+ }
+ }
+ if (!LDpath.empty())
+ {
+ setEnvStrVal( childEnv, nextEnv, "LD_LIBRARY_PATH", LDpath.c_str( ) );
+ if (trace_settings & (TRACE_SYNC_DETAIL | TRACE_REQUEST_DETAIL |
TRACE_PROCESS_DETAIL))
+ {
+ trace_printf( "%s@%d - LD_LIBRARY_PATH = %s\n", method_name,
__LINE__, LDpath.c_str() );
+ }
}
setEnvStr ( childEnv, nextEnv, "LD_BIND_NOW=true" );
@@ -1695,15 +1724,39 @@ bool CProcess::Create (CProcess *parent, int & result)
trace_printf("%s@%d - PWD=%s\n", method_name, __LINE__,
pwd.c_str());
}
-
-
string path;
- if ( pathStrId_.nid != -1 )
- Config->strIdToString( pathStrId_, path);
- setEnvStrVal ( childEnv, nextEnv, "PATH", path.c_str() );
+ static bool sv_getenv_path_done = false;
+ static string sv_path;
+ if (IsAgentMode)
+ {
+ if (! sv_getenv_path_done)
+ {
+ sv_getenv_path_done = true;
+ sv_path = getenv( "PATH" );
+ if (trace_settings & (TRACE_SYNC_DETAIL | TRACE_REQUEST_DETAIL |
TRACE_PROCESS_DETAIL))
+ {
+ trace_printf( "%s@%d" " - PATH = " "%s" "\n", method_name,
__LINE__, sv_path.c_str() );
+ }
+ }
+ path = sv_path;
+ if (trace_settings & (TRACE_SYNC_DETAIL | TRACE_REQUEST_DETAIL |
TRACE_PROCESS_DETAIL))
+ {
+ trace_printf( "%s@%d" " - PATH = " "%s" "\n", method_name,
__LINE__, path.c_str() );
+ }
+ }
+ else
+ {
+ if (pathStrId_.nid != -1)
+ {
+ Config->strIdToString( pathStrId_, path );
+ }
+ }
+ setEnvStrVal( childEnv, nextEnv, "PATH", path.c_str( ) );
if (trace_settings & (TRACE_SYNC_DETAIL | TRACE_REQUEST_DETAIL |
TRACE_PROCESS_DETAIL))
- trace_printf("%s@%d" " - PATH = " "%s" "\n", method_name, __LINE__,
path.c_str());
+ {
+ trace_printf( "%s@%d" " - PATH = " "%s" "\n", method_name, __LINE__,
path.c_str() );
+ }
// Set values from registry as environment variables
setEnvFromRegistry ( childEnv, nextEnv );
http://git-wip-us.apache.org/repos/asf/trafodion/blob/bded0e84/core/sqf/monitor/linux/reqqueue.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/reqqueue.cxx
b/core/sqf/monitor/linux/reqqueue.cxx
index e4e8dbd..b4d2529 100644
--- a/core/sqf/monitor/linux/reqqueue.cxx
+++ b/core/sqf/monitor/linux/reqqueue.cxx
@@ -56,6 +56,7 @@ extern int req_type_startup;
extern bool IAmIntegrating;
extern bool IAmIntegrated;
+
extern CommType_t CommType;
CReqResource::CReqResource()
http://git-wip-us.apache.org/repos/asf/trafodion/blob/bded0e84/core/sqf/monitor/linux/zclient.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/zclient.cxx
b/core/sqf/monitor/linux/zclient.cxx
index 36a0600..107cf32 100644
--- a/core/sqf/monitor/linux/zclient.cxx
+++ b/core/sqf/monitor/linux/zclient.cxx
@@ -488,6 +488,103 @@ int CZClient::ZooExistRetry(zhandle_t *zh, const char
*path, int watch, struct S
return rc;
}
+const char* CZClient::WaitForAndReturnMaster( bool doWait )
+{
+ const char method_name[] = "CZClient::WaitForAndReturnMaster";
+ TRACE_ENTRY;
+
+ bool found = false;
+ int rc = -1;
+ int retries = 0;
+ Stat stat;
+
+ struct String_vector nodes = {0, NULL};
+ stringstream ss;
+ ss.str( "" );
+ ss << zkRootNode_.c_str()
+ << zkRootNodeInstance_.c_str()
+ << ZCLIENT_MASTER_ZNODE;
+ string masterMonitor( ss.str( ) );
+
+ // wait for 3 minutes for giving up.
+ while ( (!found) && (retries < 180))
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d trafCluster=%s\n"
+ , method_name, __LINE__, masterMonitor.c_str() );
+ }
+ // Verify the existence of the parent ZCLIENT_MASTER_ZNODE
+ rc = ZooExistRetry( ZHandle, masterMonitor.c_str( ), 0, &stat );
+
+ if ( rc == ZNONODE )
+ {
+ if (doWait == false)
+ {
+ break;
+ }
+ continue;
+ }
+ else if ( rc == ZOK )
+ {
+ // Now get the list of available znodes in the cluster.
+ //
+ // This will return child znodes for each monitor process that has
+ // registered, including this process.
+ rc = zoo_get_children( ZHandle, masterMonitor.c_str( ), 0, &nodes
);
+ if ( nodes.count > 0 )
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d nodes.count=%d\n"
+ , method_name, __LINE__
+ , nodes.count );
+ }
+ found = true;
+ }
+ else
+ {
+ if (doWait == false)
+ {
+ break;
+ }
+ usleep(1000000); // sleep for a second as to not overwhelm the
system
+ retries++;
+ continue;
+ }
+ }
+
+ else // error
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d Error (MasterMonitor)
WaitForAndReturnMaster returned rc (%d), retries %d\n"
+ , method_name, __LINE__, rc, retries );
+ }
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s], ZooExistRetry() for %s failed with error %s\n"
+ , method_name, masterMonitor.c_str( ), zerror(rc));
+ mon_log_write(MON_ZCLIENT_WAITFORANDRETURNMASTER, SQ_LOG_ERR, buf);
+ break;
+ }
+ }
+
+ //should we assert nodes.count == 1?
+ if (found)
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d (MasterMonitor) Master Monitor found (%s)\n"
+ , method_name, __LINE__, masterMonitor.c_str() );
+ }
+ return nodes.data[0];
+ }
+
+ TRACE_EXIT;
+ return NULL;
+}
+
int CZClient::GetClusterZNodes( String_vector *nodes )
{
const char method_name[] = "CZClient::GetClusterZNodes";
@@ -700,7 +797,7 @@ void CZClient::HandleExpiredZNode( void )
int CZClient::InitializeZClient( void )
{
- const char method_name[] = "CZClient::MakeClusterZNodes";
+ const char method_name[] = "CZClient::InitializeZClient";
TRACE_ENTRY;
int rc;
@@ -799,6 +896,67 @@ bool CZClient::IsZNodeExpired( const char *nodeName, int
&zerr )
return( expired );
}
+int CZClient::CreateMasterZNode( const char *nodeName )
+{
+ const char method_name[] = "CZClient::CreateMasterZNode";
+ TRACE_ENTRY;
+
+ int rc;
+ int retries = 0;
+
+ stringstream masterpath;
+ masterpath.str( "" );
+ masterpath << zkRootNode_.c_str()
+ << zkRootNodeInstance_.c_str()
+ << ZCLIENT_MASTER_ZNODE<< "/"
+ << nodeName;
+
+ string monZnode = masterpath.str( );
+
+ stringstream ss;
+ ss.str( "" );
+ ss <<nodeName << ":" << MyPNID;
+ string monData = ss.str( );
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d RegisterZNode(%s:%s)\n"
+ , method_name, __LINE__
+ , monZnode.c_str()
+ , monData.c_str() );
+ }
+
+ rc = RegisterZNode( monZnode.c_str(), monData.c_str(), ZOO_EPHEMERAL );
+ while ( ((rc == ZCONNECTIONLOSS) || (rc == ZOPERATIONTIMEOUT)) && retries
< ZOOKEEPER_RETRY_COUNT)
+ {
+ sleep(ZOOKEEPER_RETRY_WAIT);
+ retries++;
+ rc = RegisterZNode( monZnode.c_str(), monData.c_str(), ZOO_EPHEMERAL );
+ }
+
+ if (rc != ZOK)
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d Error (MasterMonitor) Create master node for
%s with rc = %d)\n"
+ , method_name, __LINE__, monZnode.c_str( ), rc);
+ }
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s], RegisterZNode(%s) failed with error %s\n"
+ , method_name, monData.c_str(), zerror(rc) );
+ mon_log_write(MON_ZCLIENT_CREATEMASTERZNODE, SQ_LOG_ERR, buf);
+ return(rc); // Return the error
+ }
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d (MasterMonitor) Created master node for %s with
rc = %d)\n"
+ , method_name, __LINE__, monZnode.c_str( ), rc);
+ }
+ TRACE_EXIT;
+ return(rc);
+}
+
int CZClient::MakeClusterZNodes( void )
{
const char method_name[] = "CZClient::MakeClusterZNodes";
@@ -908,6 +1066,40 @@ int CZClient::MakeClusterZNodes( void )
break;
}
+ ss.str( "" );
+ ss << zkRootNode_.c_str()
+ << zkRootNodeInstance_.c_str()
+ << ZCLIENT_MASTER_ZNODE;
+ string masterDir( ss.str( ) );
+
+ rc = ZooExistRetry( ZHandle, masterDir.c_str( ), 0, &stat );
+ switch (rc)
+ {
+ case ZOK:
+ break;
+ case ZNONODE:
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d RegisterZNode(%s)\n"
+ , method_name, __LINE__
+ , masterDir.c_str() );
+ }
+ rc = RegisterZNode( masterDir.c_str(), NULL, 0 );
+ if ( rc && rc != ZNODEEXISTS )
+ {
+ return(rc); // Return the error
+ }
+ rc = ZOK;
+ break;
+ default:
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s], zoo_exists(%s) failed with error %s\n"
+ , method_name, masterDir.c_str(), zerror(rc) );
+ mon_log_write(MON_ZCLIENT_CHECKCLUSTERZNODES_3, SQ_LOG_ERR, buf);
+ break;
+ }
+
TRACE_EXIT;
return(rc);
}
@@ -1484,6 +1676,53 @@ void CZClient::WatchCluster( void )
TRACE_EXIT;
}
+int CZClient::WatchMasterNode( const char *nodeName )
+{
+ const char method_name[] = "CZClient::WatchMasterNode";
+ TRACE_ENTRY;
+
+ int rc;
+ stringstream newpath;
+ newpath.str( "" );
+ newpath << zkRootNode_.c_str()
+ << zkRootNodeInstance_.c_str()
+ << ZCLIENT_MASTER_ZNODE << "/"
+ << nodeName;
+ string monZnode = newpath.str( );
+
+ lock();
+ rc = SetZNodeWatch( monZnode );
+ unlock();
+ if ( rc != ZOK )
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d Error (MasterMonitor) WatchMasterNode failed
with rc = %d for %s\n"
+ , method_name, __LINE__
+ , rc
+ , nodeName);
+ }
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s], SetZNodeWatch(%s) failed!\n"
+ , method_name
+ , monZnode.c_str() );
+ mon_log_write(MON_ZCLIENT_WATCHNODE_1, SQ_LOG_ERR, buf);
+ }
+ else
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d (MasterMonitor) WatchMasterNode set on
monZnode=%s\n"
+ , method_name, __LINE__
+ , monZnode.c_str() );
+ }
+ }
+
+ TRACE_EXIT;
+ return(rc);
+}
+
int CZClient::WatchNode( const char *nodeName )
{
const char method_name[] = "CZClient::WatchNode";
@@ -1524,6 +1763,108 @@ int CZClient::WatchNode( const char *nodeName )
return(rc);
}
+int CZClient::WatchNodeMasterDelete( const char *nodeName )
+{
+ const char method_name[] = "CZClient::WatchMasterDelete";
+ TRACE_ENTRY;
+
+ int rc = -1;
+ stringstream newpath;
+ newpath.str( "" );
+ newpath << zkRootNode_.c_str()
+ << zkRootNodeInstance_.c_str()
+ << ZCLIENT_MASTER_ZNODE
+ << nodeName;
+
+ string monZnode = newpath.str( );
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d zoo_delete(%s)\n"
+ , method_name, __LINE__
+ , monZnode.c_str() );
+ }
+
+ rc = zoo_delete( ZHandle
+ , monZnode.c_str( )
+ , -1 );
+ if ( rc == ZOK )
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d (MasterMonitor) WatchNodeMasterDelete deleted
%s, with rc == ZOK\n"
+ , method_name, __LINE__
+ , nodeName );
+ }
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s], znode (%s) deleted!\n"
+ , method_name, nodeName );
+ mon_log_write(MON_ZCLIENT_WATCHMASTERNODEDELETE_1, SQ_LOG_INFO, buf);
+ }
+ else if ( rc == ZNONODE )
+ {
+ // This is fine since we call it indiscriminately
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d (MasterMonitor) WatchNodeMasterDelete deleted
%s, with rc == ZNONODE (fine)\n"
+ , method_name, __LINE__
+ , nodeName );
+ }
+ }
+ else if ( rc == ZCONNECTIONLOSS ||
+ rc == ZOPERATIONTIMEOUT )
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d (MasterMonitor) WatchNodeMasterDelete deleted
%s, with rc == ZOK\n"
+ , method_name, __LINE__
+ , nodeName );
+ }
+ rc = ZOK;
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s], znode (%s) already deleted or cannot be accessed!\n"
+ , method_name, nodeName );
+ mon_log_write(MON_ZCLIENT_WATCHMASTERNODEDELETE_2, SQ_LOG_INFO, buf);
+ }
+ else
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d (MasterMonitor) WatchNodeMasterDelete deleted
%s, with rc == ZOK\n"
+ , method_name, __LINE__
+ , nodeName );
+ }
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s], zoo_delete(%s) failed with error %s\n"
+ , method_name, nodeName, zerror(rc) );
+ mon_log_write(MON_ZCLIENT_WATCHMASTERNODEDELETE_3, SQ_LOG_CRIT, buf);
+ switch ( rc )
+ {
+ case ZSYSTEMERROR:
+ case ZRUNTIMEINCONSISTENCY:
+ case ZDATAINCONSISTENCY:
+ case ZMARSHALLINGERROR:
+ case ZUNIMPLEMENTED:
+ case ZBADARGUMENTS:
+ case ZINVALIDSTATE:
+ case ZSESSIONEXPIRED:
+ case ZCLOSING:
+ // Treat these error like a session expiration, since
+ // we can't communicate with quorum servers
+ HandleMyNodeExpiration();
+ break;
+ default:
+ break;
+ }
+ }
+
+ TRACE_EXIT;
+ return( rc );
+}
+
int CZClient::WatchNodeDelete( const char *nodeName )
{
const char method_name[] = "CZClient::WatchNodeDelete";
http://git-wip-us.apache.org/repos/asf/trafodion/blob/bded0e84/core/sqf/monitor/linux/zclient.h
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/zclient.h b/core/sqf/monitor/linux/zclient.h
index ea9bca3..6108021 100644
--- a/core/sqf/monitor/linux/zclient.h
+++ b/core/sqf/monitor/linux/zclient.h
@@ -104,6 +104,7 @@ using namespace std;
#define ZCLIENT_TRAFODION_ZNODE "/trafodion"
#define ZCLIENT_INSTANCE_ZNODE "/instance"
+#define ZCLIENT_MASTER_ZNODE "/master"
typedef list<string> ZNodeList_t;
@@ -137,6 +138,7 @@ public:
, const char *instanceZNode );
~CZClient( void );
+ int CreateMasterZNode( const char *nodeName );
int GetSessionTimeout( void) { return( zkSessionTimeout_ ); }
bool IsZNodeExpired( const char *nodeName, int &zerr );
void MonitorZCluster( void );
@@ -148,8 +150,11 @@ public:
int StartWork( void );
void StopMonitoring( void );
void TriggerCheck( int type, const char *znodePath );
+ const char* WaitForAndReturnMaster( bool doWait );
int WatchNode( const char *nodeName );
+ int WatchMasterNode( const char *nodeName );
int WatchNodeDelete( const char *nodeName );
+ int WatchNodeMasterDelete( const char *nodeName );
private:
int ZooExistRetry(zhandle_t *zh, const char *path, int watch, struct
Stat *stat);
http://git-wip-us.apache.org/repos/asf/trafodion/blob/bded0e84/core/sqf/sqenvcom.sh
----------------------------------------------------------------------
diff --git a/core/sqf/sqenvcom.sh b/core/sqf/sqenvcom.sh
index 7058637..c5a9164 100644
--- a/core/sqf/sqenvcom.sh
+++ b/core/sqf/sqenvcom.sh
@@ -673,6 +673,18 @@ export SQ_LUNMGR_VERBOSITY=1
# Control SQ default startup behavior (c=cold, w=warm, if removed sqstart will
autocheck)
export SQ_STARTUP=r
+# Monitor process creator:
+# MPIRUN - monitor process is created by mpirun
+# Uncomment SQ_MON_CREATOR when running monitor in AGENT mode
+#export SQ_MON_CREATOR=MPIRUN
+
+# Monitor process run mode:
+# AGENT - monitor process runs in agent mode versus MPI collective
+# Uncomment the three environment variables below
+#export SQ_MON_RUN_MODE=AGENT
+#export MONITOR_COMM_PORT=23399
+#export MONITOR_SYNC_PORT=23398
+
# Alternative logging capability in monitor
export SQ_MON_ALTLOG=0
http://git-wip-us.apache.org/repos/asf/trafodion/blob/bded0e84/core/sqf/sql/scripts/sqnodes.pm
----------------------------------------------------------------------
diff --git a/core/sqf/sql/scripts/sqnodes.pm b/core/sqf/sql/scripts/sqnodes.pm
index 0d09565..36d8f0c 100644
--- a/core/sqf/sql/scripts/sqnodes.pm
+++ b/core/sqf/sql/scripts/sqnodes.pm
@@ -279,10 +279,10 @@ sub verifyParse
displayStmt($stmtOk);
print " Error: node-id not specified\n";
}
- elsif ($nodeId > 1023)
+ elsif ($nodeId > 255)
{
displayStmt($stmtOk);
- print " Error: node-id must be in the range 0..1023.\n";
+ print " Error: node-id must be in the range 0..255.\n";
}
if (@cores == 0)
{
http://git-wip-us.apache.org/repos/asf/trafodion/blob/bded0e84/core/sqf/src/trafconf/clusterconf.cpp
----------------------------------------------------------------------
diff --git a/core/sqf/src/trafconf/clusterconf.cpp
b/core/sqf/src/trafconf/clusterconf.cpp
index 0ebffeb..039b87d 100644
--- a/core/sqf/src/trafconf/clusterconf.cpp
+++ b/core/sqf/src/trafconf/clusterconf.cpp
@@ -49,6 +49,7 @@ using namespace std;
CClusterConfig::CClusterConfig( void )
: CPNodeConfigContainer(TC_NODES_MAX)
, CLNodeConfigContainer(TC_NODES_MAX)
+ , configMaster_(-1)
, nodeReady_(false)
, persistReady_(false)
, newPNodeConfig_(true)
@@ -61,6 +62,8 @@ CClusterConfig::CClusterConfig( void )
const char method_name[] = "CClusterConfig::CClusterConfig";
TRACE_ENTRY;
+ memset( &configMasterName_, 0, TC_PROCESSOR_NAME_MAX );
+
TRACE_EXIT;
}
@@ -373,6 +376,13 @@ bool CClusterConfig::LoadNodeConfig( void )
for (int i =0; i < nodeCount; i++ )
{
ProcessLNode( nodeConfigData[i], pnodeConfigInfo, lnodeConfigInfo );
+ // We want to pick the first configured node so all monitors pick the
same one
+ // This only comes into play for a Trafodion start from scratch
+ if (i == 0)
+ {
+ configMaster_ = pnodeConfigInfo.pnid;
+ strcpy (configMasterName_ ,pnodeConfigInfo.nodename);
+ }
AddNodeConfiguration( pnodeConfigInfo, lnodeConfigInfo );
}
http://git-wip-us.apache.org/repos/asf/trafodion/blob/bded0e84/core/sqf/src/trafconf/clusterconf.h
----------------------------------------------------------------------
diff --git a/core/sqf/src/trafconf/clusterconf.h
b/core/sqf/src/trafconf/clusterconf.h
index 1a8942f..ff4b17e 100644
--- a/core/sqf/src/trafconf/clusterconf.h
+++ b/core/sqf/src/trafconf/clusterconf.h
@@ -43,6 +43,8 @@ public:
void Clear( void );
bool DeleteNodeConfig( int pnid );
+ int GetConfigMaster ( ) { return configMaster_;}
+ char * GetConfigMasterByName() {return configMasterName_;}
bool Initialize( void );
bool Initialize( bool traceEnabled, const char *traceFile );
void InitCoreMask( cpu_set_t &coreMask );
@@ -73,6 +75,8 @@ public:
protected:
private:
+ int configMaster_;
+ char configMasterName_[TC_PROCESSOR_NAME_MAX];
bool nodeReady_; // true when node configuration loaded
bool persistReady_; // true when persist configuration loaded
bool newPNodeConfig_;