Repository: incubator-hawq Updated Branches: refs/heads/master 7ce195a3f -> 571c50b07
HAWQ-1334. QD thread should set error code if failing so that the main process for the query could exit soon Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/571c50b0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/571c50b0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/571c50b0 Branch: refs/heads/master Commit: 571c50b07ec8a45a2203ec061fb6fbadfa7f3047 Parents: 7ce195a Author: Paul Guo <[email protected]> Authored: Wed Feb 15 16:50:07 2017 +0800 Committer: Paul Guo <[email protected]> Committed: Thu Feb 16 11:30:23 2017 +0800 ---------------------------------------------------------------------- src/backend/cdb/dispatcher_mgt.c | 44 +++++++++++++++++-------- src/backend/cdb/executormgr.c | 62 +++++++++++++++++++++-------------- src/include/cdb/executormgr.h | 1 + 3 files changed, 69 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/571c50b0/src/backend/cdb/dispatcher_mgt.c ---------------------------------------------------------------------- diff --git a/src/backend/cdb/dispatcher_mgt.c b/src/backend/cdb/dispatcher_mgt.c index 31c483e..5b1622d 100644 --- a/src/backend/cdb/dispatcher_mgt.c +++ b/src/backend/cdb/dispatcher_mgt.c @@ -281,22 +281,26 @@ dispmgt_thread_func_run(QueryExecutorGroup *group, struct WorkerMgrState *state) QueryExecutorInGroupIterator iterator; struct DispatchData *data = group->team->refDispatchData; struct QueryExecutor *executor; + struct QueryExecutor *err_handle_executor = NULL; /* Assume the connections are already set up. */ dispmgt_init_query_executor_in_group_iterator(group, &iterator, false); while ((executor = dispmgt_get_query_executor_in_group_iterator(group, &iterator)) != NULL) { + if (err_handle_executor == NULL) + err_handle_executor = executor; + if (workermgr_should_query_stop(state)) { - write_log("+++++++++%s meets should query " - "stop before dispatching, entering error_cleanup", - __func__); + write_log("%s(): query is canceled before dispatching. " + "Will exit and clean up.", __func__); + err_handle_executor = executor; goto error_cleanup; } if (!executormgr_dispatch_and_run(data, executor)) { - write_log("+++++++++%s meets dispatch_and_run " - "problem when dispatching, entering error_cleanup", - __func__); + write_log("%s(): query cannot dispatch and run. " + "Will exit and clean up.", __func__); + err_handle_executor = executor; goto error_cleanup; } } @@ -310,9 +314,8 @@ dispmgt_thread_func_run(QueryExecutorGroup *group, struct WorkerMgrState *state) /* Check global state to abort query, this let poll process easier. */ if (workermgr_should_query_stop(state)){ - write_log("%s meets should query stop when " - "polling executors, entering error_cleanup", - __func__); + write_log("%s(): query is canceled before polling executors." + "Will exit and clean up.", __func__); goto error_cleanup; } /* Skip the stopped executor make the logic easy to understand. */ @@ -321,9 +324,10 @@ dispmgt_thread_func_run(QueryExecutorGroup *group, struct WorkerMgrState *state) { if (!executormgr_check_segment_status(executor)) { - write_log("Detected one segment (Global ID: %d) is down, so " - "abort the query that is running or will run on it", - executormgr_get_ID(executor)); + write_log("%s(): detected one segment (Global ID: %d) is down, " + "so abort the query that is running or will run on it", + __func__, executormgr_get_ID(executor)); + err_handle_executor = executor; goto error_cleanup; } @@ -359,6 +363,8 @@ dispmgt_thread_func_run(QueryExecutorGroup *group, struct WorkerMgrState *state) * System call poll error is only caused by program bug or system * resources unavailable. In this case, fail the query is okay. */ + write_log("%s(): poll() failed with errno: %d. " + "Will exit and clean up.", __func__, SOCK_ERRNO); goto error_cleanup; } @@ -386,7 +392,9 @@ dispmgt_thread_func_run(QueryExecutorGroup *group, struct WorkerMgrState *state) continue; if (!executormgr_consume(executor)) { - write_log("%s meets consume error for executor, entering error_cleanup", __func__); + write_log("%s(): fail to consume data. " + "Will exit and clean up.", __func__); + err_handle_executor = executor; goto error_cleanup; } } @@ -416,6 +424,16 @@ error_cleanup: continue; } + /* + * Previously query failed, probably in this executor. We need to + * set error code here if it has not been set although the executor + * is probably fine. This let the main process for the query proceed + * to cancel the query in its thread also, without waiting for a long + * time. We expect the error code have been set previously before jumping + * to error_cleanup. The code below could be the last defence. + */ + executormgr_seterrcode_if_needed(err_handle_executor); + thread_return: return; } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/571c50b0/src/backend/cdb/executormgr.c ---------------------------------------------------------------------- diff --git a/src/backend/cdb/executormgr.c b/src/backend/cdb/executormgr.c index 85d3381..772af1a 100644 --- a/src/backend/cdb/executormgr.c +++ b/src/backend/cdb/executormgr.c @@ -472,9 +472,7 @@ executormgr_dispatch_and_run(struct DispatchData *data, QueryExecutor *executor) return true; error: - if (query) - free(query); - write_log("function executormgr_dispatch_and_run meets error."); + free(query); executormgr_catch_error(executor); return false; } @@ -499,6 +497,22 @@ executormgr_check_segment_status(QueryExecutor *executor) return true; } +void +executormgr_seterrcode_if_needed(QueryExecutor *executor) +{ + struct CdbDispatchResult *dispatchResult; + + if (executor == NULL) + return; + + dispatchResult = executormgr_get_executor_result(executor); + if (dispatchResult->errcode == ERRCODE_SUCCESSFUL_COMPLETION || + dispatchResult->errcode == ERRCODE_INTERNAL_ERROR) + { + cdbdisp_seterrcode(ERRCODE_INTERNAL_ERROR, -1, dispatchResult); + } +} + /* * executormgr_consume * If there are data available for executor, use this interface to consume data. @@ -605,16 +619,16 @@ executormgr_discard(QueryExecutor *executor) static void executormgr_catch_error(QueryExecutor *executor) { - PGconn *conn = executor->desc->conn; - char *msg; - int errCode = 0; + PGconn *conn = executor->desc->conn; + char *msg; + int errCode = 0; if (executor->refResult->errcode != 0) - errCode = executor->refResult->errcode; + errCode = executor->refResult->errcode; msg = PQerrorMessage(conn); if (msg && (strcmp("", msg) != 0) && (executor->refResult->errcode == 0)) { - errCode = ERRCODE_GP_INTERCONNECTION_ERROR; + errCode = ERRCODE_GP_INTERCONNECTION_ERROR; } PQExpBufferData selfDesc; @@ -624,23 +638,21 @@ executormgr_catch_error(QueryExecutor *executor) executor->desc->segment->hostname, executor->desc->segment->port); - if (!executor->refResult->error_message) { - cdbdisp_appendMessage( - executor->refResult, - LOG, - errCode, - "%s %s: %s", - (executor->state == QES_DISPATCHABLE ? - "Error dispatching to" : - (executor->state == QES_RUNNING ? - "Query Executor Error in" : "Error in ")), - (executor->desc->whoami && strcmp(executor->desc->whoami, "") != 0) ? - executor->desc->whoami : selfDesc.data, - msg ? msg : "unknown error"); - } - - termPQExpBuffer(&selfDesc); - PQfinish(conn); + if (!executor->refResult->error_message) { + cdbdisp_appendMessage( + executor->refResult, LOG, errCode, + "%s %s: %s", + (executor->state == QES_DISPATCHABLE ? + "Error dispatching to" : + (executor->state == QES_RUNNING ? + "Query Executor Error in" : "Error in ")), + (executor->desc->whoami && strcmp(executor->desc->whoami, "") != 0) ? + executor->desc->whoami : selfDesc.data, + msg ? msg : "unknown error"); + } + + termPQExpBuffer(&selfDesc); + PQfinish(conn); executor->desc->conn = NULL; http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/571c50b0/src/include/cdb/executormgr.h ---------------------------------------------------------------------- diff --git a/src/include/cdb/executormgr.h b/src/include/cdb/executormgr.h index 7d9f3c0..dfc230f 100644 --- a/src/include/cdb/executormgr.h +++ b/src/include/cdb/executormgr.h @@ -75,6 +75,7 @@ extern int executormgr_get_fd(struct QueryExecutor *executor); extern bool executormgr_cancel(struct QueryExecutor * executor); extern bool executormgr_dispatch_and_run(struct DispatchData *data, struct QueryExecutor *executor); extern bool executormgr_check_segment_status(struct QueryExecutor *executor); +extern void executormgr_seterrcode_if_needed(struct QueryExecutor *executor); extern bool executormgr_consume(struct QueryExecutor *executor); extern bool executormgr_discard(struct QueryExecutor *executor); extern void executormgr_merge_error(struct QueryExecutor *exeutor);
