Repository: incubator-hawq
Updated Branches:
  refs/heads/master 7ce195a3f -> 571c50b07


HAWQ-1334. QD thread should set error code if failing so that the main process 
for the query could exit soon


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/571c50b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/571c50b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/571c50b0

Branch: refs/heads/master
Commit: 571c50b07ec8a45a2203ec061fb6fbadfa7f3047
Parents: 7ce195a
Author: Paul Guo <[email protected]>
Authored: Wed Feb 15 16:50:07 2017 +0800
Committer: Paul Guo <[email protected]>
Committed: Thu Feb 16 11:30:23 2017 +0800

----------------------------------------------------------------------
 src/backend/cdb/dispatcher_mgt.c | 44 +++++++++++++++++--------
 src/backend/cdb/executormgr.c    | 62 +++++++++++++++++++++--------------
 src/include/cdb/executormgr.h    |  1 +
 3 files changed, 69 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/571c50b0/src/backend/cdb/dispatcher_mgt.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/dispatcher_mgt.c b/src/backend/cdb/dispatcher_mgt.c
index 31c483e..5b1622d 100644
--- a/src/backend/cdb/dispatcher_mgt.c
+++ b/src/backend/cdb/dispatcher_mgt.c
@@ -281,22 +281,26 @@ dispmgt_thread_func_run(QueryExecutorGroup *group, struct 
WorkerMgrState *state)
        QueryExecutorInGroupIterator    iterator;
        struct DispatchData                     *data = 
group->team->refDispatchData;
        struct QueryExecutor                    *executor;
+       struct QueryExecutor                    *err_handle_executor = NULL;
 
        /* Assume the connections are already set up. */
        dispmgt_init_query_executor_in_group_iterator(group, &iterator, false);
        while ((executor = dispmgt_get_query_executor_in_group_iterator(group, 
&iterator)) != NULL)
        {
+               if (err_handle_executor == NULL)
+                       err_handle_executor = executor;
+
                if (workermgr_should_query_stop(state)) {
-                       write_log("+++++++++%s meets should query "
-                                         "stop before dispatching, entering 
error_cleanup",
-                                         __func__);
+                       write_log("%s(): query is canceled before dispatching. "
+                                         "Will exit and clean up.", __func__);
+                       err_handle_executor = executor;
                        goto error_cleanup;
                }
 
                if (!executormgr_dispatch_and_run(data, executor)) {
-                       write_log("+++++++++%s meets dispatch_and_run "
-                                         "problem when dispatching, entering 
error_cleanup",
-                                         __func__);
+                       write_log("%s(): query cannot dispatch and run. "
+                                         "Will exit and clean up.", __func__);
+                       err_handle_executor = executor;
                        goto error_cleanup;
                }
        }
@@ -310,9 +314,8 @@ dispmgt_thread_func_run(QueryExecutorGroup *group, struct 
WorkerMgrState *state)
 
                /* Check global state to abort query, this let poll process 
easier. */
                if (workermgr_should_query_stop(state)){
-                       write_log("%s meets should query stop when "
-                                         "polling executors, entering 
error_cleanup",
-                                         __func__);
+                       write_log("%s(): query is canceled before polling 
executors."
+                                         "Will exit and clean up.", __func__);
                        goto error_cleanup;
                }
                /* Skip the stopped executor make the logic easy to understand. 
*/
@@ -321,9 +324,10 @@ dispmgt_thread_func_run(QueryExecutorGroup *group, struct 
WorkerMgrState *state)
                {
                        if (!executormgr_check_segment_status(executor))
                        {
-                               write_log("Detected one segment (Global ID: %d) 
is down, so "
-                                                 "abort the query that is 
running or will run on it",
-                                                 executormgr_get_ID(executor));
+                               write_log("%s(): detected one segment (Global 
ID: %d) is down, "
+                                                 "so abort the query that is 
running or will run on it",
+                                                 __func__, 
executormgr_get_ID(executor));
+                               err_handle_executor = executor;
                                goto error_cleanup;
                        }
 
@@ -359,6 +363,8 @@ dispmgt_thread_func_run(QueryExecutorGroup *group, struct 
WorkerMgrState *state)
                         * System call poll error is only caused by program bug 
or system
                         * resources unavailable. In this case, fail the query 
is okay.
                         */
+                       write_log("%s(): poll() failed with errno: %d. "
+                                         "Will exit and clean up.", __func__, 
SOCK_ERRNO);
                        goto error_cleanup;
                }
 
@@ -386,7 +392,9 @@ dispmgt_thread_func_run(QueryExecutorGroup *group, struct 
WorkerMgrState *state)
                                continue;
 
                        if (!executormgr_consume(executor)) {
-                               write_log("%s meets consume error for executor, 
entering error_cleanup", __func__);
+                               write_log("%s(): fail to consume data. "
+                                                 "Will exit and clean up.", 
__func__);
+                               err_handle_executor = executor;
                                goto error_cleanup;
                        }
                }
@@ -416,6 +424,16 @@ error_cleanup:
                        continue;
        }
 
+       /*
+        * Previously query failed, probably in this executor. We need to
+        * set error code here if it has not been set although the executor
+        * is probably fine. This let the main process for the query proceed
+        * to cancel the query in its thread also, without waiting for a long
+        * time. We expect the error code have been set previously before 
jumping
+        * to error_cleanup. The code below could be the last defence.
+        */
+       executormgr_seterrcode_if_needed(err_handle_executor);
+
 thread_return:
        return;
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/571c50b0/src/backend/cdb/executormgr.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/executormgr.c b/src/backend/cdb/executormgr.c
index 85d3381..772af1a 100644
--- a/src/backend/cdb/executormgr.c
+++ b/src/backend/cdb/executormgr.c
@@ -472,9 +472,7 @@ executormgr_dispatch_and_run(struct DispatchData *data, 
QueryExecutor *executor)
        return true;
 
 error:
-       if (query)
-               free(query);
-       write_log("function executormgr_dispatch_and_run meets error.");
+       free(query);
        executormgr_catch_error(executor);
        return false;
 }
@@ -499,6 +497,22 @@ executormgr_check_segment_status(QueryExecutor *executor)
        return true;
 }
 
+void
+executormgr_seterrcode_if_needed(QueryExecutor *executor)
+{
+       struct CdbDispatchResult *dispatchResult;
+ 
+       if (executor == NULL)
+               return;
+
+       dispatchResult = executormgr_get_executor_result(executor);
+    if (dispatchResult->errcode == ERRCODE_SUCCESSFUL_COMPLETION ||
+               dispatchResult->errcode == ERRCODE_INTERNAL_ERROR)
+       {
+               cdbdisp_seterrcode(ERRCODE_INTERNAL_ERROR, -1, dispatchResult);
+       }
+}
+
 /*
  * executormgr_consume
  *     If there are data available for executor, use this interface to consume 
data.
@@ -605,16 +619,16 @@ executormgr_discard(QueryExecutor *executor)
 static void
 executormgr_catch_error(QueryExecutor *executor)
 {
-       PGconn                  *conn = executor->desc->conn;
-       char                    *msg;
-       int       errCode = 0;
+       PGconn  *conn = executor->desc->conn;
+       char    *msg;
+       int             errCode = 0;
        if (executor->refResult->errcode != 0)
-         errCode = executor->refResult->errcode;
+               errCode = executor->refResult->errcode;
 
        msg = PQerrorMessage(conn);
 
        if (msg && (strcmp("", msg) != 0) && (executor->refResult->errcode == 
0)) {
-         errCode = ERRCODE_GP_INTERCONNECTION_ERROR;
+               errCode = ERRCODE_GP_INTERCONNECTION_ERROR;
        }
 
        PQExpBufferData selfDesc;
@@ -624,23 +638,21 @@ executormgr_catch_error(QueryExecutor *executor)
                          executor->desc->segment->hostname,
                          executor->desc->segment->port);
 
-  if (!executor->refResult->error_message) {
-    cdbdisp_appendMessage(
-        executor->refResult,
-        LOG,
-        errCode,
-        "%s %s: %s",
-        (executor->state == QES_DISPATCHABLE ?
-            "Error dispatching to" :
-            (executor->state == QES_RUNNING ?
-                "Query Executor Error in" : "Error in ")),
-        (executor->desc->whoami && strcmp(executor->desc->whoami, "") != 0) ?
-            executor->desc->whoami : selfDesc.data,
-        msg ? msg : "unknown error");
-  }
-
-  termPQExpBuffer(&selfDesc);
-  PQfinish(conn);
+       if (!executor->refResult->error_message) {
+               cdbdisp_appendMessage(
+                               executor->refResult, LOG, errCode,
+                               "%s %s: %s",
+                               (executor->state == QES_DISPATCHABLE ?
+                                       "Error dispatching to" :
+                                       (executor->state == QES_RUNNING ?
+                                               "Query Executor Error in" : 
"Error in ")),
+                               (executor->desc->whoami && 
strcmp(executor->desc->whoami, "") != 0) ?
+                                       executor->desc->whoami : selfDesc.data,
+                               msg ? msg : "unknown error");
+                                 }
+
+       termPQExpBuffer(&selfDesc);
+       PQfinish(conn);
 
        executor->desc->conn = NULL;
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/571c50b0/src/include/cdb/executormgr.h
----------------------------------------------------------------------
diff --git a/src/include/cdb/executormgr.h b/src/include/cdb/executormgr.h
index 7d9f3c0..dfc230f 100644
--- a/src/include/cdb/executormgr.h
+++ b/src/include/cdb/executormgr.h
@@ -75,6 +75,7 @@ extern int    executormgr_get_fd(struct QueryExecutor 
*executor);
 extern bool    executormgr_cancel(struct QueryExecutor * executor);
 extern bool    executormgr_dispatch_and_run(struct DispatchData *data, struct 
QueryExecutor *executor);
 extern bool    executormgr_check_segment_status(struct QueryExecutor 
*executor);
+extern void    executormgr_seterrcode_if_needed(struct QueryExecutor 
*executor);
 extern bool    executormgr_consume(struct QueryExecutor *executor);
 extern bool    executormgr_discard(struct QueryExecutor *executor);
 extern void    executormgr_merge_error(struct QueryExecutor *exeutor);

Reply via email to