More p2p fixes.

Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/1584dc43
Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/1584dc43
Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/1584dc43

Branch: refs/heads/master
Commit: 1584dc4372686ba901fc745b41492f810dadcae8
Parents: 71d6a1b
Author: Zalo Correa <[email protected]>
Authored: Fri Apr 6 08:02:21 2018 -0700
Committer: Zalo Correa <[email protected]>
Committed: Fri Apr 6 08:02:21 2018 -0700

----------------------------------------------------------------------
 core/sqf/monitor/linux/pnode.cxx         | 37 ++++++++++++++++++--
 core/sqf/monitor/linux/pnode.h           |  2 +-
 core/sqf/monitor/linux/ptpclient.cxx     | 50 +++++++++++++++++++++++++++
 core/sqf/monitor/linux/ptpclient.h       |  5 +++
 core/sqf/monitor/linux/ptpcommaccept.cxx | 10 ++++++
 core/sqf/monitor/linux/reqnewproc.cxx    |  6 ++--
 core/sqf/src/seabed/test/goall           |  2 +-
 7 files changed, 104 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafodion/blob/1584dc43/core/sqf/monitor/linux/pnode.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/pnode.cxx b/core/sqf/monitor/linux/pnode.cxx
index 5c51ada..5da34cd 100644
--- a/core/sqf/monitor/linux/pnode.cxx
+++ b/core/sqf/monitor/linux/pnode.cxx
@@ -51,6 +51,9 @@ using namespace std;
 #include "replicate.h"
 #include "reqqueue.h"
 #include "healthcheck.h"
+#ifndef NAMESERVER_PROCESS
+#include "ptpclient.h"
+#endif
 
 extern CReqQueue ReqQueue;
 extern char MyPath[MAX_PROCESS_PATH];
@@ -82,6 +85,7 @@ extern CClusterConfig *ClusterConfig;
 const char *StateString( STATE state);
 #ifndef NAMESERVER_PROCESS
 const char *SyncStateString( SyncState state);
+extern CPtpClient *PtpClient;
 extern CNameServer *NameServer;
 extern CProcess *NameServerProcess;
 extern bool NameServerEnabled;
@@ -1013,7 +1017,7 @@ bool CNode::GetSchedulingData( void )
 }
 
 
-strId_t CNode::GetStringId(char * candidate)
+strId_t CNode::GetStringId( char *candidate, CLNode *targetLNode )
 {
     const char method_name[] = "CNode::GetStringId";
     strId_t id;
@@ -1025,10 +1029,37 @@ strId_t CNode::GetStringId(char * candidate)
         id.id  = uniqStrId_++;
         id.nid = pnid_;
 
+        if (trace_settings & TRACE_PROCESS)
+        {
+            trace_printf("%s@%d - Adding unique string id=[%d,%d] (%s), 
targetLnode=%p\n",
+                         method_name, __LINE__, id.nid, id.id, candidate, 
targetLNode );
+        }
+
         Config->addUniqueString(id.nid, id.id, candidate);
 
-        CReplUniqStr *repl = new CReplUniqStr ( id.nid, id.id, candidate );
-        Replicator.addItem(repl);
+#ifndef NAMESERVER_PROCESS
+        if (NameServerEnabled)
+        {
+            if (targetLNode != NULL &&
+                !MyNode->IsMyNode(targetLNode->GetNid()))
+            {
+                // Forward the unique string to the target node
+                PtpClient->AddUniqStr( id.nid
+                                     , id.id
+                                     , candidate
+                                     , targetLNode->GetNid()
+                                     , targetLNode->GetNode()->GetName());
+            }
+        }
+        else
+#endif
+        {
+#ifdef NAMESERVER_PROCESS
+            targetLNode = targetLNode;  // Make compiler happy!
+#endif
+            CReplUniqStr *repl = new CReplUniqStr ( id.nid, id.id, candidate );
+            Replicator.addItem(repl);
+        }
     }
     // temp trace
     else

http://git-wip-us.apache.org/repos/asf/trafodion/blob/1584dc43/core/sqf/monitor/linux/pnode.h
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/pnode.h b/core/sqf/monitor/linux/pnode.h
index f0b89bb..eb4829e 100644
--- a/core/sqf/monitor/linux/pnode.h
+++ b/core/sqf/monitor/linux/pnode.h
@@ -256,7 +256,7 @@ public:
     // If candidate string has not been seen before assign a unique
     // id and store it in the config database.   In either case return
     // the unique id as the value of the method.
-    strId_t GetStringId(char *candidate);
+    strId_t GetStringId( char *candidate, CLNode *targetLNode = NULL );
 
     inline int   GetTmSyncNid( void ) { return( tmSyncNid_ ); }
     inline SyncState GetTmSyncState( void ) { return( tmSyncState_ ); }

http://git-wip-us.apache.org/repos/asf/trafodion/blob/1584dc43/core/sqf/monitor/linux/ptpclient.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/ptpclient.cxx 
b/core/sqf/monitor/linux/ptpclient.cxx
index 98ca3a4..0593498 100644
--- a/core/sqf/monitor/linux/ptpclient.cxx
+++ b/core/sqf/monitor/linux/ptpclient.cxx
@@ -97,6 +97,56 @@ CPtpClient::~CPtpClient (void)
     TRACE_EXIT;
 }
 
+int  CPtpClient::AddUniqStr( int nid
+                           , int id
+                           , const char *stringValue
+                           , int targetNid
+                           , const char *targetNodeName )
+{
+    const char method_name[] = "CPtpClient::AddUniqStr";
+    TRACE_ENTRY;
+
+    if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+    {
+        trace_printf( "%s@%d - Sending InternalType_UniqStr request to %s, "
+                      "targetNid=%d\n"
+                    , method_name, __LINE__
+                    , targetNodeName
+                    , targetNid );
+    }
+
+    struct internal_msg_def msg;
+    memset(&msg, 0, sizeof(msg)); 
+    msg.type = InternalType_UniqStr;
+    msg.u.uniqstr.nid = nid;
+    msg.u.uniqstr.id  = id;
+
+    char *stringData = & msg.u.uniqstr.valueData;
+    int  stringDataLen = strlen(stringValue) + 1;
+
+    // Copy the string
+    memcpy( stringData, stringValue, stringDataLen );
+
+    int size = offsetof(struct internal_msg_def, u);
+    size += sizeof(msg.u.uniqstr);
+    size += stringDataLen;
+    
+    if (trace_settings & TRACE_PROCESS_DETAIL)
+    {
+        trace_printf( "%s@%d - size_=%d, forwarding unique string [%d, %d] 
(%s)\n"
+                    , method_name, __LINE__
+                    , size
+                    , msg.u.uniqstr.nid
+                    , msg.u.uniqstr.id
+                    , &msg.u.uniqstr.valueData  );
+    }
+
+    int error = SendToMon("add-unique-string", &msg, size, targetNid, 
targetNodeName);
+    
+    TRACE_EXIT;
+    return error;
+}
+
 int CPtpClient::InitializePtpClient( char * ptpPort )
 {
     const char method_name[] = "CPtpClient::InitializePtpClient";

http://git-wip-us.apache.org/repos/asf/trafodion/blob/1584dc43/core/sqf/monitor/linux/ptpclient.h
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/ptpclient.h 
b/core/sqf/monitor/linux/ptpclient.h
index d46ea5e..87f2315 100644
--- a/core/sqf/monitor/linux/ptpclient.h
+++ b/core/sqf/monitor/linux/ptpclient.h
@@ -40,6 +40,11 @@ public:
     CPtpClient( void );
     virtual ~CPtpClient( void );
 
+    int  AddUniqStr( int nid
+                   , int id
+                   , const char *stringValue
+                   , int targetNid
+                   , const char *targetNodeName );
     int  InitializePtpClient( char * ptpPort );
     int  ProcessClone( CProcess *process );
     int  ProcessExit( CProcess* process

http://git-wip-us.apache.org/repos/asf/trafodion/blob/1584dc43/core/sqf/monitor/linux/ptpcommaccept.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/ptpcommaccept.cxx 
b/core/sqf/monitor/linux/ptpcommaccept.cxx
index c6d5145..dc48b41 100644
--- a/core/sqf/monitor/linux/ptpcommaccept.cxx
+++ b/core/sqf/monitor/linux/ptpcommaccept.cxx
@@ -104,6 +104,16 @@ void CPtpCommAccept::processNewSock( int sockFd )
     {
         switch ( msg.type )
         {
+            case InternalType_UniqStr:
+            {
+                if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+                {
+                    trace_printf( "%s@%d" " - Received InternalType_UniqStr\n"
+                                , method_name, __LINE__ );
+                }
+                ReqQueue.enqueueUniqStrReq( &msg.u.uniqstr);
+                break;
+            }
             case InternalType_Process:
             {
                 if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))

http://git-wip-us.apache.org/repos/asf/trafodion/blob/1584dc43/core/sqf/monitor/linux/reqnewproc.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/reqnewproc.cxx 
b/core/sqf/monitor/linux/reqnewproc.cxx
index 5ebfec5..b5f9ec1 100644
--- a/core/sqf/monitor/linux/reqnewproc.cxx
+++ b/core/sqf/monitor/linux/reqnewproc.cxx
@@ -441,9 +441,9 @@ void CExtNewProcReq::performRequest()
     
         if (lnode->GetNumProcs() < MAX_PROCESSES)
         {
-            strId_t pathStrId = MyNode->GetStringId ( 
msg_->u.request.u.new_process.path );
-            strId_t ldpathStrId = MyNode->GetStringId 
(msg_->u.request.u.new_process.ldpath );
-            strId_t programStrId = MyNode->GetStringId ( 
msg_->u.request.u.new_process.program );
+            strId_t pathStrId = MyNode->GetStringId ( 
msg_->u.request.u.new_process.path, lnode );
+            strId_t ldpathStrId = MyNode->GetStringId 
(msg_->u.request.u.new_process.ldpath, lnode );
+            strId_t programStrId = MyNode->GetStringId ( 
msg_->u.request.u.new_process.program, lnode );
 
             if (MyNode->IsMyNode(lnode->Nid))
             {

http://git-wip-us.apache.org/repos/asf/trafodion/blob/1584dc43/core/sqf/src/seabed/test/goall
----------------------------------------------------------------------
diff --git a/core/sqf/src/seabed/test/goall b/core/sqf/src/seabed/test/goall
index 468ce62..0c213e7 100755
--- a/core/sqf/src/seabed/test/goall
+++ b/core/sqf/src/seabed/test/goall
@@ -255,7 +255,7 @@ go241 $cluster $verbose
 go242 $cluster $verbose
 go245 $cluster $verbose
 go246 $cluster $verbose
-go249 $cluster $verbose
+####go249 $cluster $verbose
 ####go250 $cluster $verbose
 go253 $cluster $verbose
 go261 $cluster $verbose

Reply via email to