This is an automated email from the ASF dual-hosted git repository. ztao1987 pushed a commit to branch ztao in repository https://gitbox.apache.org/repos/asf/hawq.git
commit c3b1c41cf624cceb5a2c7dd3f8729511c298c9dc Author: ztao1987 <[email protected]> AuthorDate: Sat Dec 4 17:10:13 2021 +0800 HAWQ-1817. add exception handling for dispatcher --- src/backend/cdb/cdbconn.c | 20 +++++++++++++++++++- src/backend/cdb/cdbvars.c | 1 + src/backend/cdb/dispatcher_mgr.c | 4 ++-- src/backend/cdb/dispatcher_new.c | 10 ++++++++++ src/backend/cdb/executormgr_new.c | 32 +++++++++++++++++++++----------- src/backend/gp_libpq_fe/fe-connect.c | 12 ++++++------ src/backend/gp_libpq_fe/fe-protocol3.c | 1 + src/backend/gp_libpq_fe/gp-libpq-fe.h | 2 +- src/backend/tcop/postgres.c | 5 +++++ src/include/cdb/cdbvars.h | 2 ++ src/include/cdb/dispatcher_new.h | 1 + 11 files changed, 69 insertions(+), 21 deletions(-) diff --git a/src/backend/cdb/cdbconn.c b/src/backend/cdb/cdbconn.c index 87aed62..dfa6178 100644 --- a/src/backend/cdb/cdbconn.c +++ b/src/backend/cdb/cdbconn.c @@ -445,8 +445,26 @@ cdbconn_main_doconnect(SegmentDatabaseDescriptor *segdbDesc, * Wait for it to respond giving us the TCP port number * where it listens for connections from the gang below. */ - PQgetQEsDetail(segdbDesc->conn, connMsg->data, connMsg->len); + if(!PQgetQEsDetail(segdbDesc->conn, connMsg->data, connMsg->len)){ + if (!segdbDesc->errcode) + segdbDesc->errcode = ERRCODE_GP_INTERCONNECTION_ERROR; + write_log("Master unable to getQEsDetail from %s : %s\nConnection option: %s", + segdbDesc->whoami, PQerrorMessage(segdbDesc->conn), + connection_string); + + appendPQExpBuffer(&segdbDesc->error_message, + "Master unable to getQEsDetail from %s: %s\n", + segdbDesc->whoami, PQerrorMessage(segdbDesc->conn)); + + /* Don't use elog, it's not thread-safe */ + if (gp_log_gang >= GPVARS_VERBOSITY_DEBUG) + write_log("%s\n", segdbDesc->error_message.data); + + PQfinish(segdbDesc->conn); + segdbDesc->conn = NULL; + return false; + } /* * Check for connection reset. diff --git a/src/backend/cdb/cdbvars.c b/src/backend/cdb/cdbvars.c index c5a770b..0225d34 100644 --- a/src/backend/cdb/cdbvars.c +++ b/src/backend/cdb/cdbvars.c @@ -54,6 +54,7 @@ GpRoleValue Gp_role; /* Role paid by this Greenplum Database backend */ char *gp_role_string; /* Staging area for guc.c */ bool gp_set_read_only; /* Staging area for guc.c */ +bool proxy_dispatcher_prepare_error = false; GpRoleValue Gp_session_role; /* Role paid by this Greenplum Database backend */ char *gp_session_role_string; /* Staging area for guc.c */ diff --git a/src/backend/cdb/dispatcher_mgr.c b/src/backend/cdb/dispatcher_mgr.c index b1e4feb..a8286f5 100644 --- a/src/backend/cdb/dispatcher_mgr.c +++ b/src/backend/cdb/dispatcher_mgr.c @@ -91,7 +91,7 @@ void mainDispatchFuncConnect(struct MyQueryExecutorGroup *qeGrp, foreach (lc, qeGrp->qes) { myQe = lfirst(lc); - if (workermgr_should_query_stop(state)) break; + if (workermgr_should_query_stop(state)) goto error; if (!executormgr_main_doconnect(myQe)) goto error; } @@ -201,7 +201,7 @@ void proxyDispatchFuncConnect(struct MyQueryExecutorGroup *qeGrp, foreach (lc, qeGrp->qes) { myQe = lfirst(lc); - if (workermgr_should_query_stop(state)) break; + if (workermgr_should_query_stop(state)) goto error; if (!executormgr_proxy_doconnect(myQe)) { write_log("%s: failed to startup new qe.", __func__); diff --git a/src/backend/cdb/dispatcher_new.c b/src/backend/cdb/dispatcher_new.c index 09053f2..f00afce 100644 --- a/src/backend/cdb/dispatcher_new.c +++ b/src/backend/cdb/dispatcher_new.c @@ -1383,6 +1383,12 @@ void proxyDispatchPrepare(struct ProxyDispatchData *data) { workermgr_wait_job(state); } + if(proxyDispatchHasError(data)){ + MemoryContextSwitchTo(old); + ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR), + errmsg("proxy dispatcher failed to connect to segment"))); + } + MemoryContextSwitchTo(old); } @@ -1451,6 +1457,10 @@ void proxyDispatchCleanUp(struct ProxyDispatchData **data) { *data = NULL; } +bool proxyDispatchHasError(struct ProxyDispatchData *data) { + return data->cdata.results && data->cdata.results->errcode; +} + void mainDispatchStmtNode(struct Node *node, struct QueryContextInfo *ctx, struct QueryResource *resource, struct DispatchDataResult *result) { diff --git a/src/backend/cdb/executormgr_new.c b/src/backend/cdb/executormgr_new.c index 910ca6f..8085783 100644 --- a/src/backend/cdb/executormgr_new.c +++ b/src/backend/cdb/executormgr_new.c @@ -316,15 +316,25 @@ bool executormgr_main_consumeData(struct MyQueryExecutor *qe) { done = true; break; } - int qeIndex; - cdbdisp_deserializeDispatchResult(NULL, &qeIndex, &conn->dispBuffer); - myQe = getTaskRefQE((struct MyDispatchTask *)(list_nth( - list_nth(getTaskPerSegmentList(qe->refTask), qe->execIndex), qeIndex))); - struct CdbDispatchResult *refResult = myQe->refResult; - cdbdisp_deserializeDispatchResult(refResult, &qeIndex, &conn->dispBuffer); - conn->asyncStatus = PGASYNC_BUSY; - if (refResult->errcode != 0) { - cdbdisp_seterrcode(refResult->errcode, -1, refResult); + if (PQstatus(conn) == CONNECTION_BAD) goto error; + + if (conn->dispBuffer.len != 0) { + int qeIndex; + cdbdisp_deserializeDispatchResult(NULL, &qeIndex, &conn->dispBuffer); + myQe = getTaskRefQE((struct MyDispatchTask *)(list_nth( + list_nth(getTaskPerSegmentList(qe->refTask), qe->execIndex), + qeIndex))); + struct CdbDispatchResult *refResult = myQe->refResult; + cdbdisp_deserializeDispatchResult(refResult, &qeIndex, &conn->dispBuffer); + conn->asyncStatus = PGASYNC_BUSY; + if (refResult->errcode != 0) { + cdbdisp_seterrcode(refResult->errcode, -1, refResult); + goto error; + } + } else { + PQgetResult(conn); + write_log("main dispatcher got error msg from proxy dispatcher: %s", + conn->errorMessage.data); goto error; } } @@ -467,7 +477,7 @@ bool executormgr_main_cancel(struct MyQueryExecutor *qe) { char errbuf[256]; MemSet(errbuf, 0, sizeof(errbuf)); bool success = (PQcancel(cn, errbuf, sizeof(errbuf)) != 0); - if(!success){ + if (!success) { write_log("executormgr_main_cancel cancel failed, %s.", errbuf); } PQfreeCancel(cn); @@ -488,7 +498,7 @@ bool executormgr_proxy_cancel(struct MyQueryExecutor *qe, bool cancelRequest) { char errbuf[256]; MemSet(errbuf, 0, sizeof(errbuf)); success = (PQcancel(cn, errbuf, sizeof(errbuf)) != 0); - if(!success){ + if (!success) { write_log("executormgr_proxy_cancel cancel failed, %s.", errbuf); } PQfreeCancel(cn); diff --git a/src/backend/gp_libpq_fe/fe-connect.c b/src/backend/gp_libpq_fe/fe-connect.c index 4a36b68..f75d7da 100644 --- a/src/backend/gp_libpq_fe/fe-connect.c +++ b/src/backend/gp_libpq_fe/fe-connect.c @@ -3501,9 +3501,9 @@ PQoptions(const PGconn *conn) return conn->pgoptions; } -int PQgetQEsDetail(PGconn *conn, char *connMsg, int connMsgLen) { +bool PQgetQEsDetail(PGconn *conn, char *connMsg, int connMsgLen) { if (!conn || (PQstatus(conn) == CONNECTION_BAD)) - return -1; + return false; pqPacketSend(conn, 'V', connMsg, connMsgLen+1); resetPQExpBuffer(&conn->dispBuffer); @@ -3512,21 +3512,21 @@ int PQgetQEsDetail(PGconn *conn, char *connMsg, int connMsgLen) { if (PG_PROTOCOL_MAJOR(conn->pversion) >= 3) pqParseInput3(conn); else - return -1; + return false; while (!conn->dispBuffer.len) { pqWait(TRUE, FALSE, conn); if (pqReadData(conn) < 0) - return -1; + return false; if (PG_PROTOCOL_MAJOR(conn->pversion) >= 3) pqParseInput3(conn); else - return -1; + return false; } - return 0; + return true; } /* GPDB function to retrieve QE-backend details (motion listener) */ diff --git a/src/backend/gp_libpq_fe/fe-protocol3.c b/src/backend/gp_libpq_fe/fe-protocol3.c index 89b055f..1254f90 100644 --- a/src/backend/gp_libpq_fe/fe-protocol3.c +++ b/src/backend/gp_libpq_fe/fe-protocol3.c @@ -293,6 +293,7 @@ pqParseInput3(PGconn *conn) } break; case 'E': /* error return */ + conn->dispBuffer.len = 0; if (pqGetErrorNotice3(conn, true)) return; conn->asyncStatus = PGASYNC_READY; diff --git a/src/backend/gp_libpq_fe/gp-libpq-fe.h b/src/backend/gp_libpq_fe/gp-libpq-fe.h index 2184033..be1cc83 100644 --- a/src/backend/gp_libpq_fe/gp-libpq-fe.h +++ b/src/backend/gp_libpq_fe/gp-libpq-fe.h @@ -320,7 +320,7 @@ extern char *PQport(const PGconn *conn); extern char *PQtty(const PGconn *conn); extern char *PQoptions(const PGconn *conn); extern int PQgetQEdetail(PGconn *conn, bool alwaysFetch); /* GPDB -- retrieve QE-backend details. */ -extern int PQgetQEsDetail(PGconn *conn, char *connMsg, int connMsgLen); +extern bool PQgetQEsDetail(PGconn *conn, char *connMsg, int connMsgLen); extern ConnStatusType PQstatus(const PGconn *conn); extern PGTransactionStatusType PQtransactionStatus(const PGconn *conn); extern const char *PQparameterStatus(const PGconn *conn, diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 085ab08..ba6c17a 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -4784,6 +4784,10 @@ PostgresMain(int argc, char *argv[], const char *username) /* Now we can allow interrupts again */ RESUME_INTERRUPTS(); + + if(proxy_dispatcher_prepare_error){ + exit(0); + } } /* We can now handle ereport(ERROR) */ @@ -5512,6 +5516,7 @@ PostgresMain(int argc, char *argv[], const char *username) } PG_CATCH(); { + proxy_dispatcher_prepare_error = true; proxyDispatchCleanUp(&dispatchData); PG_RE_THROW(); } diff --git a/src/include/cdb/cdbvars.h b/src/include/cdb/cdbvars.h index fb7f30d..5b58562 100644 --- a/src/include/cdb/cdbvars.h +++ b/src/include/cdb/cdbvars.h @@ -144,6 +144,8 @@ extern char *gp_role_string; /* Use by guc.c as staging area for value. */ extern const char *assign_gp_role(const char *newval, bool doit, GucSource source); extern const char *show_gp_role(void); +extern bool proxy_dispatcher_prepare_error; + extern bool gp_reraise_signal; /* try to force a core dump ?*/ extern bool gp_version_mismatch_error; /* Enforce same-version on QD&QE. */ diff --git a/src/include/cdb/dispatcher_new.h b/src/include/cdb/dispatcher_new.h index ea84071..c0aeb9f 100644 --- a/src/include/cdb/dispatcher_new.h +++ b/src/include/cdb/dispatcher_new.h @@ -59,6 +59,7 @@ extern void sendSegQEDetails(struct ProxyDispatchData *data); extern void proxyDispatchRun(struct ProxyDispatchData *data, char *connMsg); extern void proxyDispatchWait(struct ProxyDispatchData *data); extern void proxyDispatchCleanUp(struct ProxyDispatchData **data); +extern bool proxyDispatchHasError(struct ProxyDispatchData *data); // dispatch statement extern void mainDispatchStmtNode(struct Node *node,
