This is an automated email from the ASF dual-hosted git repository.

ztao1987 pushed a commit to branch ztao
in repository https://gitbox.apache.org/repos/asf/hawq.git


The following commit(s) were added to refs/heads/ztao by this push:
     new 04fee35  HAWQ-1827. enable rm to check segment down within RUAlive
04fee35 is described below

commit 04fee35750bfe8dd83b7d1172f21d4e950d28ddc
Author: ztao1987 <zhenglin.ta...@gmail.com>
AuthorDate: Sun Jan 9 16:41:53 2022 +0800

    HAWQ-1827. enable rm to check segment down within RUAlive
---
 .../communication/rmcomm_AsyncComm.c               | 12 +++++++--
 .../communication/rmcomm_RM2RMSEG.c                |  5 +++-
 .../communication/rmcomm_RMSEG2RM.c                |  1 +
 .../include/communication/rmcomm_AsyncComm.h       |  1 +
 .../resourcemanager/include/utils/network_utils.h  |  4 +++
 src/backend/resourcemanager/utils/network_utils.c  | 30 ++++++++++++++++++++++
 6 files changed, 50 insertions(+), 3 deletions(-)

diff --git a/src/backend/resourcemanager/communication/rmcomm_AsyncComm.c 
b/src/backend/resourcemanager/communication/rmcomm_AsyncComm.c
index f2203db..4abba94 100644
--- a/src/backend/resourcemanager/communication/rmcomm_AsyncComm.c
+++ b/src/backend/resourcemanager/communication/rmcomm_AsyncComm.c
@@ -414,7 +414,8 @@ int processAllCommFileDescs(void)
                        if ( CommBuffers[i]->ClientHostname.Str != NULL &&
                                 CommBuffers[i]->ServerPort != 0 )
                        {
-                               elog(DEBUG3, "Return FD %d Index %d.", 
CommBuffers[i]->FD, i);
+                               elog(DEBUG3, "Return FD %d Index %d. host:port 
%s:%d", CommBuffers[i]->FD, i,
+                                    CommBuffers[i]->ClientHostname.Str, 
CommBuffers[i]->ServerPort);
                                returnAliveConnectionRemoteByHostname(
                                                        &(CommBuffers[i]->FD),
                                                        
CommBuffers[i]->ClientHostname.Str,
@@ -659,6 +660,7 @@ int registerAsyncConnectionFileDesc(const char              
                *address,
                                                                        
uint32_t                                 actionmask,
                                                                        
AsyncCommBufferHandlers  methods,
                                                                        void    
                                *userdata,
+                                                                       bool 
useNewConnection,
                                                                        
AsyncCommBuffer                 *newcommbuffer)
 {
        int                                     res                             
= FUNC_RETURN_OK;
@@ -678,7 +680,13 @@ int registerAsyncConnectionFileDesc(const char             
                *address,
                return UTIL_NETWORK_FAIL_GETHOST;
        }
 
-       if ( rm_enable_connpool )
+       if ( useNewConnection )
+       {
+         /* remove old connection in the connpool for RUAlive check */
+         removeAliveSocketConnection(address, resolvedaddr, port);
+       }
+
+       if ( !useNewConnection && rm_enable_connpool )
        {
                /* Try to get an alive connection from connection pool. */
                fd = fetchAliveSocketConnection(address, resolvedaddr, port);
diff --git a/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c 
b/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
index 2539356..64c447f 100644
--- a/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
+++ b/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
@@ -111,12 +111,13 @@ int sendRUAlive(char *seghostname)
        context->MessageCleanUpHandler   = sentRUAliveCleanUp;
        context->UserData                = (void *)segres;
 
-       /* Connect to HAWQ RM server */
+       /* always create new connection for RUAlive msg */
        res = registerAsyncConnectionFileDesc(seghostname,
                                                                                
  rm_segment_port,
                                                                                
  ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES,
                                                                                
  &AsyncCommBufferHandlersMessage,
                                                                                
  context,
+                                                                               
  true,
                                                                                
  &newcommbuffer);
        if ( res != FUNC_RETURN_OK )
        {
@@ -342,6 +343,7 @@ int increaseMemoryQuota(char *seghostname, GRMContainerSet 
containerset)
                                                                                
  ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES,
                                                                                
  &AsyncCommBufferHandlersMessage,
                                                                                
  context,
+                                                                               
  false,
                                                                                
  &commbuffer);
     if ( res != FUNC_RETURN_OK )
     {
@@ -505,6 +507,7 @@ int decreaseMemoryQuota(char                        
*seghostname,
                                                                                
  ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES,
                                                                                
  &AsyncCommBufferHandlersMessage,
                                                                                
  context,
+                                                                               
  false,
                                                                                
  &commbuffer);
     if ( res != FUNC_RETURN_OK )
     {
diff --git a/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c 
b/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c
index 4c93f78..272709b 100644
--- a/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c
+++ b/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c
@@ -103,6 +103,7 @@ int sendIMAlive(int  *errorcode,
                                                                                
  ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES,
                                                                                
  &AsyncCommBufferHandlersMessage,
                                                                                
  context,
+                                                                               
  false,
                                                                                
  &newcommbuffer);
        if ( res != FUNC_RETURN_OK )
        {
diff --git 
a/src/backend/resourcemanager/include/communication/rmcomm_AsyncComm.h 
b/src/backend/resourcemanager/include/communication/rmcomm_AsyncComm.h
index eb3cf8a..8010e13 100644
--- a/src/backend/resourcemanager/include/communication/rmcomm_AsyncComm.h
+++ b/src/backend/resourcemanager/include/communication/rmcomm_AsyncComm.h
@@ -101,6 +101,7 @@ int registerAsyncConnectionFileDesc(const char              
                *address,
                                                                        
uint32_t                                 actionmask,
                                                                        
AsyncCommBufferHandlers  methods,
                                                                        void    
                                *userdata,
+                                                                       bool 
useNewConnection,
                                                                        
AsyncCommBuffer                 *newcommbuffer);
 
 /* Process all registered file descriptors. */
diff --git a/src/backend/resourcemanager/include/utils/network_utils.h 
b/src/backend/resourcemanager/include/utils/network_utils.h
index a048131..ea37303 100644
--- a/src/backend/resourcemanager/include/utils/network_utils.h
+++ b/src/backend/resourcemanager/include/utils/network_utils.h
@@ -148,4 +148,8 @@ AddressString getAddressStringByHostName(const char 
*hostname);
 int fetchAliveSocketConnection(const char       *hostname,
                                                           AddressString  
address,
                                                           uint16_t       port);
+
+int removeAliveSocketConnection(const char    *hostname,
+                                AddressString  address,
+                                uint16_t     port);
 #endif /* RESOURCE_MANANGER_NETWORK_UTILITIES_H */
diff --git a/src/backend/resourcemanager/utils/network_utils.c 
b/src/backend/resourcemanager/utils/network_utils.c
index 78a29cb..0cfdfad 100644
--- a/src/backend/resourcemanager/utils/network_utils.c
+++ b/src/backend/resourcemanager/utils/network_utils.c
@@ -603,6 +603,36 @@ int fetchAliveSocketConnection(const char   *hostname,
        return res;
 }
 
+int removeAliveSocketConnection(const char    *hostname,
+                                AddressString  address,
+                                uint16_t     port)
+{
+  ConnAddressString connaddr = createConnAddressString(address, port);
+  SimpArray key;
+  setSimpleArrayRef(&key, (char *)connaddr, SIZEOFCONNADDRSTRING(connaddr));
+  PAIR pair = getHASHTABLENode(&ActiveConnections, (void *)&key);
+  if ( pair == NULL )
+  {
+    freeConnAddressString(connaddr);
+    return -1;
+  }
+  List *aliveconns = (List *)(pair->Value);
+  while( aliveconns != NULL )
+  {
+    int fd = lfirst_int(list_head(aliveconns));
+    MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
+    aliveconns = list_delete_first(aliveconns);
+    MEMORY_CONTEXT_SWITCH_BACK
+    elog(DEBUG3, "Removed FD %d for %s:%d.", fd, hostname, port);
+    closeConnectionRemote(&fd);
+  }
+  pair->Value = NULL;
+  removeHASHTABLENode(&ActiveConnections, (void *)&key);
+  freeConnAddressString(connaddr);
+
+  return FUNC_RETURN_OK;
+}
+
 void returnAliveConnectionRemoteByHostname(int                   *clientfd,
                                                                                
   const char *hostname,
                                                                                
   uint16_t port)

Reply via email to