Repository: incubator-hawq
Updated Branches:
  refs/heads/master 6616ba71b -> 740e04dfe


HAWQ-1436. Implement ranger pulgin service High Availability.
    1. master will the connect to standby RPS for policy search if RPS on 
master failed;
    2. if master has been talking to standby RPS for a period(controlled by GUC 
hawq_rps_check_local_interval, 5 minutes by default), it will try to connect 
local RPS again.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/740e04df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/740e04df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/740e04df

Branch: refs/heads/master
Commit: 740e04dfe1d4ed7b4faf3d6c7f22d4dd7e4d3e08
Parents: 6616ba7
Author: Wen Lin <[email protected]>
Authored: Tue May 9 16:27:16 2017 +0800
Committer: Wen Lin <[email protected]>
Committed: Tue May 9 16:27:16 2017 +0800

----------------------------------------------------------------------
 src/backend/libpq/rangerrest.c | 130 ++++++++++++++++++++++++------------
 src/backend/tcop/postgres.c    |   2 +
 src/backend/utils/misc/guc.c   |  27 +++++---
 src/include/utils/guc.h        |   3 +
 src/include/utils/rangerrest.h |   2 +
 5 files changed, 111 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/740e04df/src/backend/libpq/rangerrest.c
----------------------------------------------------------------------
diff --git a/src/backend/libpq/rangerrest.c b/src/backend/libpq/rangerrest.c
index 6457fd1..a8a7324 100644
--- a/src/backend/libpq/rangerrest.c
+++ b/src/backend/libpq/rangerrest.c
@@ -383,58 +383,100 @@ static size_t write_callback(char *contents, size_t 
size, size_t nitems,
 static int call_ranger_rest(CURL_HANDLE curl_handle, const char* request)
 {
        int ret = -1;
+       int retry = 2;
        CURLcode res;
        Assert(request != NULL);
 
        /*
-        * Re-initializes all options previously set on a specified CURL handle
-        * to the default values. This puts back the handle to the same state as
-        * it was in when it was just created with curl_easy_init.It does not
-        * change the following information kept in the handle: live 
connections,
-        * the Session ID cache, the DNS cache, the cookies and shares.
+        * If master is talking with standby RPS, for every predefined interval
+        * (controlled by a GUC hawq_rps_check_local_interval) it will check if 
local RPS works now.
         */
-       curl_easy_reset(curl_handle->curl_handle);
-       /* timeout: hard-coded temporarily and maybe should be a guc in future 
*/
-       curl_easy_setopt(curl_handle->curl_handle, CURLOPT_TIMEOUT, 30L);
-
-       /* specify URL to get */
-       StringInfoData tname;
-       initStringInfo(&tname);
-       appendStringInfo(&tname, "http://";);
-       appendStringInfo(&tname, "%s", master_addr_host);
-       appendStringInfo(&tname, ":");
-       appendStringInfo(&tname, "%d", rps_addr_port);
-       appendStringInfo(&tname, "/");
-       appendStringInfo(&tname, "%s", "rps");
-       curl_easy_setopt(curl_handle->curl_handle, CURLOPT_URL, tname.data);
-       pfree(tname.data);      
-
-       struct curl_slist *headers = NULL;
-       headers = curl_slist_append(headers, "Content-Type:application/json");
-       curl_easy_setopt(curl_handle->curl_handle, CURLOPT_HTTPHEADER, headers);
-
-       curl_easy_setopt(curl_handle->curl_handle, CURLOPT_POSTFIELDS,request);
-       /* send all data to this function  */
-       curl_easy_setopt(curl_handle->curl_handle, CURLOPT_WRITEFUNCTION, 
write_callback);
-       curl_easy_setopt(curl_handle->curl_handle, CURLOPT_WRITEDATA, (void 
*)curl_handle);
-
-       res = curl_easy_perform(curl_handle->curl_handle);
-       if(request_id == INT_MAX)
+       if (curl_handle->talkingWithStandby)
        {
-               request_id = 0;
-       }
-       request_id++;
-       /* check for errors */
-       if(res != CURLE_OK)
-       {
-               elog(ERROR, "ranger plugin service from http://%s:%d/rps is 
unavailable : %s.\n",
-                               master_addr_host, rps_addr_port, 
curl_easy_strerror(res));
+               uint64_t current_time = gettime_microsec();
+               if ((current_time - curl_handle->lastCheckTimestamp) > 
1000000LL * rps_check_local_interval)
+               {
+                       curl_handle->talkingWithStandby = false;
+                       curl_handle->lastCheckTimestamp = 0;
+                       elog(RANGER_LOG,
+                               "master has been talking to standby RPS for 
more than %d seconds, try switching to master RPS",
+                               rps_check_local_interval);
+               }
        }
-       else
+
+       /*
+        * try to connect standby's RPS if fail in connecting master's RPS
+        */
+       while(retry > 0 && ret != 0)
        {
-               ret = 0;
-               elog(RANGER_LOG, "retrieved %d bytes data from ranger restful 
response.",
-                       curl_handle->response.response_size);
+               /*
+                * Re-initializes all options previously set on a specified 
CURL handle
+                * to the default values. This puts back the handle to the same 
state as
+                * it was in when it was just created with curl_easy_init.It 
does not
+                * change the following information kept in the handle: live 
connections,
+                * the Session ID cache, the DNS cache, the cookies and shares.
+                */
+               curl_easy_reset(curl_handle->curl_handle);
+               /* timeout: hard-coded temporarily and maybe should be a guc in 
future */
+               curl_easy_setopt(curl_handle->curl_handle, CURLOPT_TIMEOUT, 
30L);
+
+               /* specify URL to get */
+               StringInfoData tname;
+               initStringInfo(&tname);
+               appendStringInfo(&tname, "http://";);
+               appendStringInfo(&tname, "%s", 
curl_handle->talkingWithStandby?standby_addr_host:master_addr_host);
+               appendStringInfo(&tname, ":");
+               appendStringInfo(&tname, "%d", rps_addr_port);
+               appendStringInfo(&tname, "/rps");
+               curl_easy_setopt(curl_handle->curl_handle, CURLOPT_URL, 
tname.data);
+               pfree(tname.data);
+
+               struct curl_slist *headers = NULL;
+               headers = curl_slist_append(headers, 
"Content-Type:application/json");
+               curl_easy_setopt(curl_handle->curl_handle, CURLOPT_HTTPHEADER, 
headers);
+
+               curl_easy_setopt(curl_handle->curl_handle, 
CURLOPT_POSTFIELDS,request);
+               /* send all data to this function  */
+               curl_easy_setopt(curl_handle->curl_handle, 
CURLOPT_WRITEFUNCTION, write_callback);
+               curl_easy_setopt(curl_handle->curl_handle, CURLOPT_WRITEDATA, 
(void *)curl_handle);
+
+               res = curl_easy_perform(curl_handle->curl_handle);
+               if(request_id == INT_MAX)
+               {
+                       request_id = 0;
+               }
+               request_id++;
+               /* check for errors */
+               if(res != CURLE_OK)
+               {
+                       if (retry > 1)
+                       {
+                               elog(WARNING, "ranger plugin service from 
http://%s:%d/rps is unavailable : %s, try another http://%s:%d/rps\n";,
+                                               
curl_handle->talkingWithStandby?standby_addr_host:master_addr_host, 
rps_addr_port, curl_easy_strerror(res),
+                                               
curl_handle->talkingWithStandby?master_addr_host:standby_addr_host, 
rps_addr_port);
+                               curl_handle->talkingWithStandby = 
!curl_handle->talkingWithStandby;
+                       }
+                       else
+                       {
+                               elog(ERROR, "ranger plugin service from 
http://%s:%d/rps is unavailable : %s.\n",
+                                               
curl_handle->talkingWithStandby?standby_addr_host:master_addr_host, 
rps_addr_port, curl_easy_strerror(res));
+                       }
+               }
+               else
+               {
+                       if (curl_handle->talkingWithStandby && 
curl_handle->lastCheckTimestamp == 0)
+                       {
+                               curl_handle->lastCheckTimestamp = 
gettime_microsec();
+                       }
+                       else if (!curl_handle->talkingWithStandby && 
curl_handle->lastCheckTimestamp != 0)
+                       {
+                               curl_handle->lastCheckTimestamp = 0;
+                       }
+                       ret = 0;
+                       elog(RANGER_LOG, "retrieved %d bytes data from ranger 
restful response.",
+                               curl_handle->response.response_size);
+               }
+               retry--;
        }
 
        return ret;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/740e04df/src/backend/tcop/postgres.c
----------------------------------------------------------------------
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 40983a0..488b8e1 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -4436,6 +4436,8 @@ PostgresMain(int argc, char *argv[], const char *username)
                        elog(ERROR, "initialize global curl context failed.");
                }
                curl_context_ranger.hasInited = true;
+               curl_context_ranger.talkingWithStandby = false;
+               curl_context_ranger.lastCheckTimestamp = 0;
                curl_context_ranger.response.buffer = 
palloc0(CURL_RES_BUFFER_SIZE);
                curl_context_ranger.response.buffer_size = CURL_RES_BUFFER_SIZE;
                elog(RANGER_LOG, "initialize global curl context for privileges 
check.");

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/740e04df/src/backend/utils/misc/guc.c
----------------------------------------------------------------------
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 6a443ec..6769d3b 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -783,8 +783,8 @@ bool gp_plpgsql_clear_cache_always = false;
 bool gp_called_by_pgdump = false;
 
 char   *acl_type;
-
-int     rps_addr_port;
+int    rps_addr_port;
+int    rps_check_local_interval;
 
 /*
  * Displayable names for context types (enum GucContext)
@@ -6266,13 +6266,22 @@ static struct config_int ConfigureNamesInt[] =
        },
 
        {
-    {"hawq_rps_address_port", PGC_POSTMASTER, PRESET_OPTIONS,
-      gettext_noop("ranger plugin server address port number"),
-      NULL
-    },
-    &rps_addr_port,
-    8432, 1, 65535, NULL, NULL
-  },
+               {"hawq_rps_address_port", PGC_POSTMASTER, PRESET_OPTIONS,
+                       gettext_noop("ranger plugin server address port 
number"),
+                       NULL
+               },
+               &rps_addr_port,
+               8432, 1, 65535, NULL, NULL
+       },
+
+       {
+               {"hawq_rps_check_local_interval", PGC_POSTMASTER, 
PRESET_OPTIONS,
+                       gettext_noop("interval of checking master's RPS if 
talking with standby's RPS"),
+                       NULL
+               },
+               &rps_check_local_interval,
+               300, 1, 65535, NULL, NULL
+       },
 
        {
                {"hawq_segment_address_port", PGC_POSTMASTER, PRESET_OPTIONS,

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/740e04df/src/include/utils/guc.h
----------------------------------------------------------------------
diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h
index a5b8214..546584f 100644
--- a/src/include/utils/guc.h
+++ b/src/include/utils/guc.h
@@ -459,6 +459,9 @@ extern char   *acl_type;
 
 /* rps port*/
 extern int     rps_addr_port;
+
+/* interval of checking local RPS */
+extern int     rps_check_local_interval;
 /*
  * During insertion in a table with parquet partitions,
  * require tuples to be sorted by partition key.

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/740e04df/src/include/utils/rangerrest.h
----------------------------------------------------------------------
diff --git a/src/include/utils/rangerrest.h b/src/include/utils/rangerrest.h
index 9b086c5..9fdf686 100644
--- a/src/include/utils/rangerrest.h
+++ b/src/include/utils/rangerrest.h
@@ -69,6 +69,8 @@ typedef struct curl_context_t
   char* last_http_reponse;
 
   bool hasInited;
+  bool talkingWithStandby;      /* if it is talking with standby RPS or not */
+  uint64_t lastCheckTimestamp;  /* last check timestamp for master RPS */
 } curl_context_t;
 
 typedef curl_context_t* CURL_HANDLE;

Reply via email to