This is an automated email from the ASF dual-hosted git repository. ztao1987 pushed a commit to branch ztao in repository https://gitbox.apache.org/repos/asf/hawq.git
The following commit(s) were added to refs/heads/ztao by this push: new 04fee35 HAWQ-1827. enable rm to check segment down within RUAlive 04fee35 is described below commit 04fee35750bfe8dd83b7d1172f21d4e950d28ddc Author: ztao1987 <zhenglin.ta...@gmail.com> AuthorDate: Sun Jan 9 16:41:53 2022 +0800 HAWQ-1827. enable rm to check segment down within RUAlive --- .../communication/rmcomm_AsyncComm.c | 12 +++++++-- .../communication/rmcomm_RM2RMSEG.c | 5 +++- .../communication/rmcomm_RMSEG2RM.c | 1 + .../include/communication/rmcomm_AsyncComm.h | 1 + .../resourcemanager/include/utils/network_utils.h | 4 +++ src/backend/resourcemanager/utils/network_utils.c | 30 ++++++++++++++++++++++ 6 files changed, 50 insertions(+), 3 deletions(-) diff --git a/src/backend/resourcemanager/communication/rmcomm_AsyncComm.c b/src/backend/resourcemanager/communication/rmcomm_AsyncComm.c index f2203db..4abba94 100644 --- a/src/backend/resourcemanager/communication/rmcomm_AsyncComm.c +++ b/src/backend/resourcemanager/communication/rmcomm_AsyncComm.c @@ -414,7 +414,8 @@ int processAllCommFileDescs(void) if ( CommBuffers[i]->ClientHostname.Str != NULL && CommBuffers[i]->ServerPort != 0 ) { - elog(DEBUG3, "Return FD %d Index %d.", CommBuffers[i]->FD, i); + elog(DEBUG3, "Return FD %d Index %d. host:port %s:%d", CommBuffers[i]->FD, i, + CommBuffers[i]->ClientHostname.Str, CommBuffers[i]->ServerPort); returnAliveConnectionRemoteByHostname( &(CommBuffers[i]->FD), CommBuffers[i]->ClientHostname.Str, @@ -659,6 +660,7 @@ int registerAsyncConnectionFileDesc(const char *address, uint32_t actionmask, AsyncCommBufferHandlers methods, void *userdata, + bool useNewConnection, AsyncCommBuffer *newcommbuffer) { int res = FUNC_RETURN_OK; @@ -678,7 +680,13 @@ int registerAsyncConnectionFileDesc(const char *address, return UTIL_NETWORK_FAIL_GETHOST; } - if ( rm_enable_connpool ) + if ( useNewConnection ) + { + /* remove old connection in the connpool for RUAlive check */ + removeAliveSocketConnection(address, resolvedaddr, port); + } + + if ( !useNewConnection && rm_enable_connpool ) { /* Try to get an alive connection from connection pool. */ fd = fetchAliveSocketConnection(address, resolvedaddr, port); diff --git a/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c b/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c index 2539356..64c447f 100644 --- a/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c +++ b/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c @@ -111,12 +111,13 @@ int sendRUAlive(char *seghostname) context->MessageCleanUpHandler = sentRUAliveCleanUp; context->UserData = (void *)segres; - /* Connect to HAWQ RM server */ + /* always create new connection for RUAlive msg */ res = registerAsyncConnectionFileDesc(seghostname, rm_segment_port, ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES, &AsyncCommBufferHandlersMessage, context, + true, &newcommbuffer); if ( res != FUNC_RETURN_OK ) { @@ -342,6 +343,7 @@ int increaseMemoryQuota(char *seghostname, GRMContainerSet containerset) ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES, &AsyncCommBufferHandlersMessage, context, + false, &commbuffer); if ( res != FUNC_RETURN_OK ) { @@ -505,6 +507,7 @@ int decreaseMemoryQuota(char *seghostname, ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES, &AsyncCommBufferHandlersMessage, context, + false, &commbuffer); if ( res != FUNC_RETURN_OK ) { diff --git a/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c b/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c index 4c93f78..272709b 100644 --- a/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c +++ b/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c @@ -103,6 +103,7 @@ int sendIMAlive(int *errorcode, ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES, &AsyncCommBufferHandlersMessage, context, + false, &newcommbuffer); if ( res != FUNC_RETURN_OK ) { diff --git a/src/backend/resourcemanager/include/communication/rmcomm_AsyncComm.h b/src/backend/resourcemanager/include/communication/rmcomm_AsyncComm.h index eb3cf8a..8010e13 100644 --- a/src/backend/resourcemanager/include/communication/rmcomm_AsyncComm.h +++ b/src/backend/resourcemanager/include/communication/rmcomm_AsyncComm.h @@ -101,6 +101,7 @@ int registerAsyncConnectionFileDesc(const char *address, uint32_t actionmask, AsyncCommBufferHandlers methods, void *userdata, + bool useNewConnection, AsyncCommBuffer *newcommbuffer); /* Process all registered file descriptors. */ diff --git a/src/backend/resourcemanager/include/utils/network_utils.h b/src/backend/resourcemanager/include/utils/network_utils.h index a048131..ea37303 100644 --- a/src/backend/resourcemanager/include/utils/network_utils.h +++ b/src/backend/resourcemanager/include/utils/network_utils.h @@ -148,4 +148,8 @@ AddressString getAddressStringByHostName(const char *hostname); int fetchAliveSocketConnection(const char *hostname, AddressString address, uint16_t port); + +int removeAliveSocketConnection(const char *hostname, + AddressString address, + uint16_t port); #endif /* RESOURCE_MANANGER_NETWORK_UTILITIES_H */ diff --git a/src/backend/resourcemanager/utils/network_utils.c b/src/backend/resourcemanager/utils/network_utils.c index 78a29cb..0cfdfad 100644 --- a/src/backend/resourcemanager/utils/network_utils.c +++ b/src/backend/resourcemanager/utils/network_utils.c @@ -603,6 +603,36 @@ int fetchAliveSocketConnection(const char *hostname, return res; } +int removeAliveSocketConnection(const char *hostname, + AddressString address, + uint16_t port) +{ + ConnAddressString connaddr = createConnAddressString(address, port); + SimpArray key; + setSimpleArrayRef(&key, (char *)connaddr, SIZEOFCONNADDRSTRING(connaddr)); + PAIR pair = getHASHTABLENode(&ActiveConnections, (void *)&key); + if ( pair == NULL ) + { + freeConnAddressString(connaddr); + return -1; + } + List *aliveconns = (List *)(pair->Value); + while( aliveconns != NULL ) + { + int fd = lfirst_int(list_head(aliveconns)); + MEMORY_CONTEXT_SWITCH_TO(PCONTEXT) + aliveconns = list_delete_first(aliveconns); + MEMORY_CONTEXT_SWITCH_BACK + elog(DEBUG3, "Removed FD %d for %s:%d.", fd, hostname, port); + closeConnectionRemote(&fd); + } + pair->Value = NULL; + removeHASHTABLENode(&ActiveConnections, (void *)&key); + freeConnAddressString(connaddr); + + return FUNC_RETURN_OK; +} + void returnAliveConnectionRemoteByHostname(int *clientfd, const char *hostname, uint16_t port)