Repository: incubator-hawq
Updated Branches:
  refs/heads/master 17a9dc1e5 -> 55e5ab5a3


HAWQ-1326. Cancel the query earlier if one of the segments for the query crashes


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/55e5ab5a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/55e5ab5a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/55e5ab5a

Branch: refs/heads/master
Commit: 55e5ab5a3eadfa9ba022db86826980cda870f1ce
Parents: 17a9dc1
Author: Paul Guo <[email protected]>
Authored: Mon Feb 6 19:54:57 2017 +0800
Committer: Paul Guo <[email protected]>
Committed: Tue Feb 14 17:12:27 2017 +0800

----------------------------------------------------------------------
 src/backend/cdb/dispatcher_mgt.c              | 32 +++++----
 src/backend/cdb/executormgr.c                 | 32 ++++++++-
 src/backend/resourcemanager/include/dynrm.h   |  8 +++
 src/backend/resourcemanager/resourcemanager.c | 77 ++++++++++++++++++++++
 src/backend/resourcemanager/resourcepool.c    | 30 +++++++++
 src/backend/storage/ipc/ipci.c                |  6 +-
 src/backend/tcop/pquery.c                     | 13 ++--
 src/include/cdb/cdbutil.h                     |  3 +
 src/include/cdb/executormgr.h                 |  2 +
 9 files changed, 181 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/55e5ab5a/src/backend/cdb/dispatcher_mgt.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/dispatcher_mgt.c b/src/backend/cdb/dispatcher_mgt.c
index 8ed3402..31c483e 100644
--- a/src/backend/cdb/dispatcher_mgt.c
+++ b/src/backend/cdb/dispatcher_mgt.c
@@ -42,7 +42,6 @@
 #endif
 #include "cdb/cdbconn.h"               /* SOCK_ERRNO */
 
-
 typedef enum DispMgtConstant {
        DISPMGT_POLL_TIME = 2 * 1000,
 } DispMgtConstant;
@@ -288,14 +287,16 @@ dispmgt_thread_func_run(QueryExecutorGroup *group, struct 
WorkerMgrState *state)
        while ((executor = dispmgt_get_query_executor_in_group_iterator(group, 
&iterator)) != NULL)
        {
                if (workermgr_should_query_stop(state)) {
-                       write_log("+++++++++dispmgr_thread_func_run meets 
should query "
-                                         "stop before dispatching, entering 
error_cleanup");
+                       write_log("+++++++++%s meets should query "
+                                         "stop before dispatching, entering 
error_cleanup",
+                                         __func__);
                        goto error_cleanup;
                }
 
                if (!executormgr_dispatch_and_run(data, executor)) {
-                       write_log("+++++++++dispmgr_thread_func_run meets 
dispatch_and_run "
-                                         "problem when dispatching, entering 
error_cleanup");
+                       write_log("+++++++++%s meets dispatch_and_run "
+                                         "problem when dispatching, entering 
error_cleanup",
+                                         __func__);
                        goto error_cleanup;
                }
        }
@@ -309,14 +310,23 @@ dispmgt_thread_func_run(QueryExecutorGroup *group, struct 
WorkerMgrState *state)
 
                /* Check global state to abort query, this let poll process 
easier. */
                if (workermgr_should_query_stop(state)){
-                       write_log("dispmgr_thread_func_run meets should query 
stop when "
-                                         "polling executors, entering 
error_cleanup");
+                       write_log("%s meets should query stop when "
+                                         "polling executors, entering 
error_cleanup",
+                                         __func__);
                        goto error_cleanup;
                }
                /* Skip the stopped executor make the logic easy to understand. 
*/
                dispmgt_init_query_executor_in_group_iterator(group, &iterator, 
true);
                while ((executor = 
dispmgt_get_query_executor_in_group_iterator(group, &iterator)) != NULL)
                {
+                       if (!executormgr_check_segment_status(executor))
+                       {
+                               write_log("Detected one segment (Global ID: %d) 
is down, so "
+                                                 "abort the query that is 
running or will run on it",
+                                                 executormgr_get_ID(executor));
+                               goto error_cleanup;
+                       }
+
                        /*
                         * The fds array may shorter than executor array.
                         * DO NOT mark executor stop!
@@ -367,7 +377,7 @@ dispmgt_thread_func_run(QueryExecutorGroup *group, struct 
WorkerMgrState *state)
                dispmgt_init_query_executor_in_group_iterator(group, &iterator, 
true);
                while ((executor = 
dispmgt_get_query_executor_in_group_iterator(group, &iterator)) != NULL)
                {
-                       int     sockfd;
+                       int __MAYBE_UNUSED sockfd;
 
                        sockfd = executormgr_get_fd(executor);
                        /* TODO: is that safe to call Assert() in a thread ? */
@@ -376,7 +386,7 @@ dispmgt_thread_func_run(QueryExecutorGroup *group, struct 
WorkerMgrState *state)
                                continue;
 
                        if (!executormgr_consume(executor)) {
-                               write_log("dispmgr_thread_func_run meets 
consume error for executor, entering error_cleanup");
+                               write_log("%s meets consume error for executor, 
entering error_cleanup", __func__);
                                goto error_cleanup;
                        }
                }
@@ -388,7 +398,7 @@ error_cleanup:
         * 1. query cancel, result error, and poll error: mark the executor 
stop.
         * 2. connection error: mark the gang error. Set by 
workermgr_mark_executor_error().
         */
-  workermgr_set_state_cancel(state);
+       workermgr_set_state_cancel(state);
        dispmgt_init_query_executor_in_group_iterator(group, &iterator, false);
        while ((executor = dispmgt_get_query_executor_in_group_iterator(group, 
&iterator)) != NULL)
        {
@@ -514,8 +524,6 @@ dispmgt_concurrent_connect(List     *executors, int 
executors_num_per_thread)
        }
        PG_CATCH();
        {
-               ListCell        *lc;
-
                workermgr_cancel_job(state);
                /* We have to clean up the executors. */
                dispmgt_free_concurrent_connect_state(tasks);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/55e5ab5a/src/backend/cdb/executormgr.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/executormgr.c b/src/backend/cdb/executormgr.c
index f8efb3d..85d3381 100644
--- a/src/backend/cdb/executormgr.c
+++ b/src/backend/cdb/executormgr.c
@@ -42,6 +42,8 @@
 #include "utils/guc_tables.h"  /* TODO: manipulate gucs */
 #include "portability/instr_time.h"    /* Monitor the dispatcher performance */
 
+#include "resourcemanager/dynrm.h"
+
 typedef enum ExecutorMgrConstant {
        EXECUTORMGR_CANCEL_ERROR_BUFFER_SIZE = 256,
 } ExecutorMgrConstant;
@@ -320,6 +322,12 @@ executormgr_get_executor_result(QueryExecutor *executor)
 }
 
 int
+executormgr_get_ID(QueryExecutor *executor)
+{
+       return executor->desc->segment->ID;
+}
+
+int
 executormgr_get_fd(QueryExecutor *executor)
 {
        return PQsocket(executor->desc->conn);
@@ -399,7 +407,7 @@ executormgr_is_dispatchable(QueryExecutor *executor)
 
        if (!executormgr_validate_conn(conn) || PQstatus(conn) == 
CONNECTION_BAD)
        {
-               write_log("function executormgr_is_dispatchable meets error, 
connection is bad.");
+               write_log("function %s meets error, connection is bad.", 
__func__);
                executormgr_catch_error(executor);
                return false;
        }
@@ -471,6 +479,26 @@ error:
        return false;
 }
 
+bool
+executormgr_check_segment_status(QueryExecutor *executor)
+{
+       /*
+        * Cancel the query if a segment is down. QEs could hang in interconnect
+        * until timeout when one segment is down. This will cause QD keep 
polling
+        * until QE timeout.
+        */
+       int ID = executormgr_get_ID(executor);
+
+       if (IsSegmentDown(ID))
+       {
+               cdbdisp_seterrcode(ERRCODE_GP_INTERCONNECTION_ERROR, -1,
+                                                  
executormgr_get_executor_result(executor));
+               return false;
+       }
+
+       return true;
+}
+
 /*
  * executormgr_consume
  *     If there are data available for executor, use this interface to consume 
data.
@@ -547,7 +575,7 @@ executormgr_consume(QueryExecutor *executor)
 
 connection_error:
        /* Let caller deal with connection error. */
-       write_log("function executormgr_consume meets error, connection is 
bad.");
+       write_log("function %s meets error, connection is bad.", __func__);
        executormgr_catch_error(executor);
        return false;
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/55e5ab5a/src/backend/resourcemanager/include/dynrm.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/dynrm.h 
b/src/backend/resourcemanager/include/dynrm.h
index a6309c8..60b6c6a 100644
--- a/src/backend/resourcemanager/include/dynrm.h
+++ b/src/backend/resourcemanager/include/dynrm.h
@@ -324,4 +324,12 @@ int  MainHandlerLoop_RMSEG(void);
 void checkAndBuildFailedTmpDirList(void);
 
 void switchIMAliveTarget(void);
+
+int SegmentStatus_ShmSize(void);
+void SegmentStatusShmemReset(void);
+void SegmentStatusShmemInit(void);
+void MarkSegmentDown(int x);
+void MarkSegmentUp(int x);
+bool IsSegmentDown(int x);
+
 #endif //DYNAMIC_RESOURCE_MANAGEMENT_H

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/55e5ab5a/src/backend/resourcemanager/resourcemanager.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcemanager.c 
b/src/backend/resourcemanager/resourcemanager.c
index dda4271..652c83f 100644
--- a/src/backend/resourcemanager/resourcemanager.c
+++ b/src/backend/resourcemanager/resourcemanager.c
@@ -20,6 +20,7 @@
 #include "envswitch.h"
 #include "dynrm.h"
 #include "utils/hashtable.h"
+#include "cdb/cdbfts.h"
 
 #include "resourcemanager/resourcemanager.h"
 #include "resourceenforcer/resourceenforcer.h"
@@ -67,6 +68,11 @@
 extern bool FindMyDatabase(const char *name, Oid *db_id, Oid *db_tablespace);
 static char *probeDatabase = "template1";
 
+/* Bitmap to monitor segment health info (up/down). */
+bits8 *shm_segment_status;
+void MarkSegmentUp(int x);
+void MarkSegmentDown(int x);
+
 int  loadAllQueueAndUser(void);
 int  loadHostInformationIntoResourcePool(void);
 
@@ -3014,3 +3020,74 @@ bool cleanedAllGRMContainers(void)
        return PRESPOOL->AddPendingContainerCount == 0 &&
                   PRESPOOL->RetPendingContainerCount == 0;
 }
+
+int SegmentStatus_ShmSize(void)
+{
+       return (FTS_MAX_DBS >> 3) + 1;
+}
+
+void SegmentStatusShmemReset(void)
+{
+       /* For each bit, 0 means an invalid node (down or non-existent). */
+       MemSet(shm_segment_status, 0, SegmentStatus_ShmSize());
+}
+
+void SegmentStatusShmemInit(void)
+{
+       bool found;
+
+       shm_segment_status = (bits8 *)ShmemInitStruct(
+               "Segment status (up/down. Monitored by RM)", 
SegmentStatus_ShmSize(), &found);
+
+       if (!shm_segment_status)
+               elog(FATAL, "could not initialize segment status bitmap (shared 
memory).");
+
+       if (found)
+               return;
+
+       SegmentStatusShmemReset();
+}
+
+void MarkSegmentDown(int x)
+{
+       int octnum, bitnum;
+
+    if (x < 0 || x >= FTS_MAX_DBS)
+        elog(ERROR, "Segment ID is out of range (%d)", x);
+
+       octnum = x >> 3;
+       bitnum = x & 0x7;
+
+       /* Clear the bit. */
+       shm_segment_status[octnum] &= ~(1 << bitnum);
+}
+
+void MarkSegmentUp(int x)
+{
+       int octnum, bitnum;
+
+    if (x < 0 || x >= FTS_MAX_DBS)
+        elog(ERROR, "Segment ID is out of range (%d)", x);
+
+       octnum = x >> 3;
+       bitnum = x & 0x7;
+
+       /* Set the bit. */
+       shm_segment_status[octnum] |= (1 << bitnum);
+}
+
+bool IsSegmentDown(int x)
+{
+       int octnum, bitnum;
+
+    if (x < 0 || x >= FTS_MAX_DBS)
+        elog(ERROR, "Segment ID is out of range (%d)", x);
+
+       octnum = x >> 3;
+       bitnum = x & 0x7;
+
+       if ((shm_segment_status[octnum] & (1 << bitnum)) == 0)
+               return true;
+
+       return false;
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/55e5ab5a/src/backend/resourcemanager/resourcepool.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcepool.c 
b/src/backend/resourcemanager/resourcepool.c
index 63357b4..9485dbf 100644
--- a/src/backend/resourcemanager/resourcepool.c
+++ b/src/backend/resourcemanager/resourcepool.c
@@ -78,6 +78,8 @@ int __DRM_NODERESPOOL_comp_ratioFree(void *arg, void *val1, 
void *val2);
 int __DRM_NODERESPOOL_comp_ratioAlloc(void *arg, void *val1, void *val2);
 int __DRM_NODERESPOOL_comp_combine(void *arg, void *val1, void *val2);
 
+extern void MarkSegmentUp(int x);
+
 /*
  * The balanced BST index comparing function. The segment containing most
  * available resource is ordered at the left most, the segment not in
@@ -411,6 +413,8 @@ void cleanup_segment_config()
        PQExpBuffer sql = NULL;
        PGresult* result = NULL;
 
+       SegmentStatusShmemReset();
+
        sprintf(conninfo, "options='-c gp_session_role=UTILITY -c 
allow_system_table_mods=dml' "
                        "dbname=template1 port=%d connect_timeout=%d", 
master_addr_port, CONNECT_TIMEOUT);
        conn = PQconnectdb(conninfo);
@@ -647,6 +651,19 @@ void update_segment_status(int32_t id, char status, char* 
description)
        else if (status == SEGMENT_STATUS_DOWN)
                Assert(strlen(description) != 0);
 
+       /* For segment nodes only. */
+       if (id >= REGISTRATION_ORDER_OFFSET)
+       {
+               if (status == SEGMENT_STATUS_UP)
+                       MarkSegmentUp(id - REGISTRATION_ORDER_OFFSET);
+               else if (status == SEGMENT_STATUS_DOWN)
+                       MarkSegmentDown(id - REGISTRATION_ORDER_OFFSET);
+               else
+                       elog(ERROR, "Unrecognized segment status character: 
'%c' "
+                                               "(Should be one of '%c' and 
'%c')", status,
+                                               SEGMENT_STATUS_UP, 
SEGMENT_STATUS_DOWN);
+       }
+
        sprintf(conninfo, "options='-c gp_session_role=UTILITY -c 
allow_system_table_mods=dml' "
                        "dbname=template1 port=%d connect_timeout=%d", 
master_addr_port, CONNECT_TIMEOUT);
        conn = PQconnectdb(conninfo);
@@ -814,6 +831,19 @@ void add_segment_config_row(int32_t id,
        PQExpBuffer sql = NULL;
        PGresult* result = NULL;
 
+       /* For segment nodes only. */
+       if (id >= REGISTRATION_ORDER_OFFSET)
+       {
+               if (status == SEGMENT_STATUS_UP)
+                       MarkSegmentUp(id - REGISTRATION_ORDER_OFFSET);
+               else if (status == SEGMENT_STATUS_DOWN)
+                       MarkSegmentDown(id - REGISTRATION_ORDER_OFFSET);
+               else
+                       elog(ERROR, "Unrecognized segment status character: 
'%c' "
+                                               "(Should be one of '%c' and 
'%c')", status,
+                                               SEGMENT_STATUS_UP, 
SEGMENT_STATUS_DOWN);
+       }
+
        sprintf(conninfo, "options='-c gp_session_role=UTILITY -c 
allow_system_table_mods=dml' "
                        "dbname=template1 port=%d connect_timeout=%d", 
master_addr_port, CONNECT_TIMEOUT);
        conn = PQconnectdb(conninfo);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/55e5ab5a/src/backend/storage/ipc/ipci.c
----------------------------------------------------------------------
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index ad73af1..f65f0ff 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -66,6 +66,8 @@
 #include "cdb/cdbtmpdir.h"
 #include "utils/session_state.h"
 
+#include "resourcemanager/dynrm.h"
+
 shmem_startup_hook_type shmem_startup_hook = NULL;
 
 static Size total_addin_request = 0;
@@ -168,6 +170,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
                size = add_size(size, PersistentRelfile_ShmemSize());
                size = add_size(size, Pass2Recovery_ShmemSize());
                size = add_size(size, FSCredShmemSize());
+               size = add_size(size, SegmentStatus_ShmSize());
 
         if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_UTILITY)
         {
@@ -363,6 +366,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 
        FSCredShmemInit();
 
+       SegmentStatusShmemInit();
 #ifdef EXEC_BACKEND
 
        /*
@@ -371,7 +375,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
        if (!IsUnderPostmaster)
                ShmemBackendArrayAllocation();
 #endif
-       
+
        SPI_InitMemoryReservation();
        
        /*

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/55e5ab5a/src/backend/tcop/pquery.c
----------------------------------------------------------------------
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index efa5bce..0a5b9d5 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -832,7 +832,8 @@ AllocateResource(QueryResourceLife   life,
                        seg->hdfsHostname       = qdseginfo->QD_HdfsHostName == 
NULL ?
                                                                  NULL :
                                                                  
pstrdup(qdseginfo->QD_HdfsHostName);
-                       seg->segindex           = i;
+                       seg->segindex           = i;
+                       seg->ID                         = 
qdseginfo->QD_SegInfo->ID;
                        seg->master             = qdseginfo->QD_SegInfo->master;
                        seg->port               = qdseginfo->QD_SegInfo->port;
                        seg->standby            = 
qdseginfo->QD_SegInfo->standby;
@@ -849,8 +850,8 @@ AllocateResource(QueryResourceLife   life,
 
                        elog(DEBUG3, "Get allocated segment located at : %s:%d,"
                                          "Address:%s,"
-                                         "Master:%d,Standby:%d,Alive:%d,ID:%d, 
"
-                                         "HDFS Host:%s",
+                                         
"Master:%d,Standby:%d,Alive:%d,internl index:%d, "
+                                         "Global ID:%d, HDFS Host:%s",
                                          seg->hostname,
                                          seg->port,
                                          seg->hostip,
@@ -858,6 +859,7 @@ AllocateResource(QueryResourceLife   life,
                                          seg->standby,
                                          seg->alive,
                                          seg->segindex,
+                                         seg->ID,
                                          (seg->hdfsHostname == NULL ? "NULL" : 
seg->hdfsHostname));
                }
        }
@@ -959,7 +961,6 @@ static void
 RemoveFromGlobalQueryResources(int resourceId, QueryResourceLife life)
 {
   ListCell *lc;
-  QueryResourceItem *newItem;
   MemoryContext old;
 
   if (life == QRL_NONE)
@@ -991,8 +992,6 @@ static void
 SetResourcesAllocatedSucceed(int resourceId, QueryResourceLife life)
 {
        ListCell *lc;
-       QueryResourceItem *newItem;
-       MemoryContext old;
 
        if (life == QRL_NONE)
        {
@@ -1019,7 +1018,7 @@ FreeResource(QueryResource *resource)
        ListCell        *lc;
        int                     ret;
        char            errorbuf[1024];
-       bool found = false;
+       bool __MAYBE_UNUSED found = false;
 
        if (!resource)
        {

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/55e5ab5a/src/include/cdb/cdbutil.h
----------------------------------------------------------------------
diff --git a/src/include/cdb/cdbutil.h b/src/include/cdb/cdbutil.h
index 23b4acc..7962067 100644
--- a/src/include/cdb/cdbutil.h
+++ b/src/include/cdb/cdbutil.h
@@ -127,6 +127,9 @@ typedef struct Segment {
        bool    alive;
 
        int             segindex;
+
+       /* Global unique ID. */
+       int             ID;
 } Segment;
 
 extern Segment *GetMasterSegment(void);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/55e5ab5a/src/include/cdb/executormgr.h
----------------------------------------------------------------------
diff --git a/src/include/cdb/executormgr.h b/src/include/cdb/executormgr.h
index d1c562e..7d9f3c0 100644
--- a/src/include/cdb/executormgr.h
+++ b/src/include/cdb/executormgr.h
@@ -70,9 +70,11 @@ extern void executormgr_get_executor_connection_info(struct 
QueryExecutor *execu
 extern bool    executormgr_is_stop(struct QueryExecutor *executor);
 extern bool    executormgr_has_error(struct QueryExecutor *executor);
 extern int     executormgr_get_executor_slice_id(struct QueryExecutor 
*executor);
+extern int     executormgr_get_ID(struct QueryExecutor *executor);
 extern int     executormgr_get_fd(struct QueryExecutor *executor);
 extern bool    executormgr_cancel(struct QueryExecutor * executor);
 extern bool    executormgr_dispatch_and_run(struct DispatchData *data, struct 
QueryExecutor *executor);
+extern bool    executormgr_check_segment_status(struct QueryExecutor 
*executor);
 extern bool    executormgr_consume(struct QueryExecutor *executor);
 extern bool    executormgr_discard(struct QueryExecutor *executor);
 extern void    executormgr_merge_error(struct QueryExecutor *exeutor);

Reply via email to