This is an automated email from the ASF dual-hosted git repository.
ztao1987 pushed a commit to branch ztao
in repository https://gitbox.apache.org/repos/asf/hawq.git
The following commit(s) were added to refs/heads/ztao by this push:
new 04fee35 HAWQ-1827. enable rm to check segment down within RUAlive
04fee35 is described below
commit 04fee35750bfe8dd83b7d1172f21d4e950d28ddc
Author: ztao1987 <[email protected]>
AuthorDate: Sun Jan 9 16:41:53 2022 +0800
HAWQ-1827. enable rm to check segment down within RUAlive
---
.../communication/rmcomm_AsyncComm.c | 12 +++++++--
.../communication/rmcomm_RM2RMSEG.c | 5 +++-
.../communication/rmcomm_RMSEG2RM.c | 1 +
.../include/communication/rmcomm_AsyncComm.h | 1 +
.../resourcemanager/include/utils/network_utils.h | 4 +++
src/backend/resourcemanager/utils/network_utils.c | 30 ++++++++++++++++++++++
6 files changed, 50 insertions(+), 3 deletions(-)
diff --git a/src/backend/resourcemanager/communication/rmcomm_AsyncComm.c
b/src/backend/resourcemanager/communication/rmcomm_AsyncComm.c
index f2203db..4abba94 100644
--- a/src/backend/resourcemanager/communication/rmcomm_AsyncComm.c
+++ b/src/backend/resourcemanager/communication/rmcomm_AsyncComm.c
@@ -414,7 +414,8 @@ int processAllCommFileDescs(void)
if ( CommBuffers[i]->ClientHostname.Str != NULL &&
CommBuffers[i]->ServerPort != 0 )
{
- elog(DEBUG3, "Return FD %d Index %d.",
CommBuffers[i]->FD, i);
+ elog(DEBUG3, "Return FD %d Index %d. host:port
%s:%d", CommBuffers[i]->FD, i,
+ CommBuffers[i]->ClientHostname.Str,
CommBuffers[i]->ServerPort);
returnAliveConnectionRemoteByHostname(
&(CommBuffers[i]->FD),
CommBuffers[i]->ClientHostname.Str,
@@ -659,6 +660,7 @@ int registerAsyncConnectionFileDesc(const char
*address,
uint32_t actionmask,
AsyncCommBufferHandlers methods,
void
*userdata,
+ bool
useNewConnection,
AsyncCommBuffer *newcommbuffer)
{
int res
= FUNC_RETURN_OK;
@@ -678,7 +680,13 @@ int registerAsyncConnectionFileDesc(const char
*address,
return UTIL_NETWORK_FAIL_GETHOST;
}
- if ( rm_enable_connpool )
+ if ( useNewConnection )
+ {
+ /* remove old connection in the connpool for RUAlive check */
+ removeAliveSocketConnection(address, resolvedaddr, port);
+ }
+
+ if ( !useNewConnection && rm_enable_connpool )
{
/* Try to get an alive connection from connection pool. */
fd = fetchAliveSocketConnection(address, resolvedaddr, port);
diff --git a/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
b/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
index 2539356..64c447f 100644
--- a/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
+++ b/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c
@@ -111,12 +111,13 @@ int sendRUAlive(char *seghostname)
context->MessageCleanUpHandler = sentRUAliveCleanUp;
context->UserData = (void *)segres;
- /* Connect to HAWQ RM server */
+ /* always create new connection for RUAlive msg */
res = registerAsyncConnectionFileDesc(seghostname,
rm_segment_port,
ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES,
&AsyncCommBufferHandlersMessage,
context,
+
true,
&newcommbuffer);
if ( res != FUNC_RETURN_OK )
{
@@ -342,6 +343,7 @@ int increaseMemoryQuota(char *seghostname, GRMContainerSet
containerset)
ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES,
&AsyncCommBufferHandlersMessage,
context,
+
false,
&commbuffer);
if ( res != FUNC_RETURN_OK )
{
@@ -505,6 +507,7 @@ int decreaseMemoryQuota(char
*seghostname,
ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES,
&AsyncCommBufferHandlersMessage,
context,
+
false,
&commbuffer);
if ( res != FUNC_RETURN_OK )
{
diff --git a/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c
b/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c
index 4c93f78..272709b 100644
--- a/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c
+++ b/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c
@@ -103,6 +103,7 @@ int sendIMAlive(int *errorcode,
ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES,
&AsyncCommBufferHandlersMessage,
context,
+
false,
&newcommbuffer);
if ( res != FUNC_RETURN_OK )
{
diff --git
a/src/backend/resourcemanager/include/communication/rmcomm_AsyncComm.h
b/src/backend/resourcemanager/include/communication/rmcomm_AsyncComm.h
index eb3cf8a..8010e13 100644
--- a/src/backend/resourcemanager/include/communication/rmcomm_AsyncComm.h
+++ b/src/backend/resourcemanager/include/communication/rmcomm_AsyncComm.h
@@ -101,6 +101,7 @@ int registerAsyncConnectionFileDesc(const char
*address,
uint32_t actionmask,
AsyncCommBufferHandlers methods,
void
*userdata,
+ bool
useNewConnection,
AsyncCommBuffer *newcommbuffer);
/* Process all registered file descriptors. */
diff --git a/src/backend/resourcemanager/include/utils/network_utils.h
b/src/backend/resourcemanager/include/utils/network_utils.h
index a048131..ea37303 100644
--- a/src/backend/resourcemanager/include/utils/network_utils.h
+++ b/src/backend/resourcemanager/include/utils/network_utils.h
@@ -148,4 +148,8 @@ AddressString getAddressStringByHostName(const char
*hostname);
int fetchAliveSocketConnection(const char *hostname,
AddressString
address,
uint16_t port);
+
+int removeAliveSocketConnection(const char *hostname,
+ AddressString address,
+ uint16_t port);
#endif /* RESOURCE_MANANGER_NETWORK_UTILITIES_H */
diff --git a/src/backend/resourcemanager/utils/network_utils.c
b/src/backend/resourcemanager/utils/network_utils.c
index 78a29cb..0cfdfad 100644
--- a/src/backend/resourcemanager/utils/network_utils.c
+++ b/src/backend/resourcemanager/utils/network_utils.c
@@ -603,6 +603,36 @@ int fetchAliveSocketConnection(const char *hostname,
return res;
}
+int removeAliveSocketConnection(const char *hostname,
+ AddressString address,
+ uint16_t port)
+{
+ ConnAddressString connaddr = createConnAddressString(address, port);
+ SimpArray key;
+ setSimpleArrayRef(&key, (char *)connaddr, SIZEOFCONNADDRSTRING(connaddr));
+ PAIR pair = getHASHTABLENode(&ActiveConnections, (void *)&key);
+ if ( pair == NULL )
+ {
+ freeConnAddressString(connaddr);
+ return -1;
+ }
+ List *aliveconns = (List *)(pair->Value);
+ while( aliveconns != NULL )
+ {
+ int fd = lfirst_int(list_head(aliveconns));
+ MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
+ aliveconns = list_delete_first(aliveconns);
+ MEMORY_CONTEXT_SWITCH_BACK
+ elog(DEBUG3, "Removed FD %d for %s:%d.", fd, hostname, port);
+ closeConnectionRemote(&fd);
+ }
+ pair->Value = NULL;
+ removeHASHTABLENode(&ActiveConnections, (void *)&key);
+ freeConnAddressString(connaddr);
+
+ return FUNC_RETURN_OK;
+}
+
void returnAliveConnectionRemoteByHostname(int *clientfd,
const char *hostname,
uint16_t port)