Repository: incubator-hawq
Updated Branches:
  refs/heads/master 2f95286e0 -> 24d4d967e


HAWQ-927. Pass ProjectionInfo data to PXF

This commit makes the necessary modifications to the HAWQ side of
the codebase to add a list of indices of projected columns


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/24d4d967
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/24d4d967
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/24d4d967

Branch: refs/heads/master
Commit: 24d4d967e767fc640ad6ab3c91b2ce26bb96de66
Parents: 2f95286
Author: Kavinder Dhaliwal <[email protected]>
Authored: Mon Jul 11 18:20:16 2016 -0700
Committer: Kavinder Dhaliwal <[email protected]>
Committed: Wed Jul 20 16:36:08 2016 -0700

----------------------------------------------------------------------
 src/backend/access/external/fileam.c     | 37 +++++++++++++++++----------
 src/backend/access/external/pxfheaders.c | 35 +++++++++++++++++++++++--
 src/backend/access/external/url.c        | 15 ++++++-----
 src/backend/executor/nodeExternalscan.c  |  6 +++--
 src/bin/gpfusion/gpbridgeapi.c           |  2 ++
 src/include/access/extprotocol.h         |  5 +++-
 src/include/access/fileam.h              | 14 +++++++++-
 src/include/access/pxfheaders.h          |  1 +
 src/include/access/url.h                 |  2 +-
 9 files changed, 91 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/backend/access/external/fileam.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/fileam.c 
b/src/backend/access/external/fileam.c
index 645e6dc..70a115a 100644
--- a/src/backend/access/external/fileam.c
+++ b/src/backend/access/external/fileam.c
@@ -79,7 +79,7 @@
 #include "cdb/cdbutil.h"
 #include "cdb/cdbvars.h"
 
-static HeapTuple externalgettup(FileScanDesc scan, ScanDirection dir);
+static HeapTuple externalgettup(FileScanDesc scan, ScanDirection dir, 
ExternalSelectDesc desc);
 static void InitParseState(CopyState pstate, Relation relation,
                                                   Datum* values, bool* nulls, 
bool writable,
                                                   List *fmtOpts, char fmtType,
@@ -97,7 +97,7 @@ static void 
FunctionCallPrepareFormatter(FunctionCallInfoData*        fcinfo,
 
 static void open_external_readable_source(FileScanDesc scan);
 static void open_external_writable_source(ExternalInsertDesc extInsertDesc);
-static int     external_getdata(URL_FILE *extfile, CopyState pstate, int 
maxread);
+static int     external_getdata(URL_FILE *extfile, CopyState pstate, int 
maxread, ExternalSelectDesc desc);
 static void external_senddata(URL_FILE *extfile, CopyState pstate);
 static void external_scan_error_callback(void *arg);
 void readHeaderLine(CopyState pstate);
@@ -454,6 +454,17 @@ external_stopscan(FileScanDesc scan)
        }
 }
 
+/*     ----------------
+ *             external_getnext_init - prepare ExternalSelectDesc struct 
before external_getnext
+ *     ----------------
+ */
+ExternalSelectDesc
+external_getnext_init(PlanState *state) {
+       ExternalSelectDesc desc = (ExternalSelectDesc) 
palloc0(sizeof(ExternalSelectDescData));
+       if (state != NULL)
+               desc->projInfo = state->ps_ProjInfo;
+       return desc;
+}
 
 /* ----------------------------------------------------------------
 *              external_getnext
@@ -462,7 +473,7 @@ external_stopscan(FileScanDesc scan)
 * ----------------------------------------------------------------
 */
 HeapTuple
-external_getnext(FileScanDesc scan, ScanDirection direction)
+external_getnext(FileScanDesc scan, ScanDirection direction, 
ExternalSelectDesc desc)
 {
        HeapTuple       tuple;
 
@@ -486,7 +497,7 @@ external_getnext(FileScanDesc scan, ScanDirection direction)
        /* Note: no locking manipulations needed */
        FILEDEBUG_1;
 
-       tuple = externalgettup(scan, direction);
+       tuple = externalgettup(scan, direction, desc);
 
 
        if (tuple == NULL)
@@ -969,7 +980,7 @@ static DataLineStatus parse_next_line(FileScanDesc scan)
 }
 
 static HeapTuple
-externalgettup_defined(FileScanDesc scan)
+externalgettup_defined(FileScanDesc scan, ExternalSelectDesc desc)
 {
                HeapTuple       tuple = NULL;
                CopyState       pstate = scan->fs_pstate;
@@ -981,7 +992,7 @@ externalgettup_defined(FileScanDesc scan)
                        /* need to fill our buffer with data? */
                        if (pstate->raw_buf_done)
                        {
-                           pstate->bytesread = 
external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE);
+                           pstate->bytesread = 
external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc);
                                pstate->begloc = pstate->raw_buf;
                                pstate->raw_buf_done = (pstate->bytesread==0);
                                pstate->raw_buf_index = 0;
@@ -1072,7 +1083,7 @@ externalgettup_defined(FileScanDesc scan)
 }
 
 static HeapTuple
-externalgettup_custom(FileScanDesc scan)
+externalgettup_custom(FileScanDesc scan, ExternalSelectDesc desc)
 {
                HeapTuple       tuple;
                CopyState               pstate = scan->fs_pstate;
@@ -1088,7 +1099,7 @@ externalgettup_custom(FileScanDesc scan)
                        /* need to fill our buffer with data? */
                        if (pstate->raw_buf_done)
                        {
-                               int      bytesread = 
external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE);
+                               int      bytesread = 
external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc);
                                if ( bytesread > 0 )
                                        
appendBinaryStringInfo(&formatter->fmt_databuf, pstate->raw_buf, bytesread);
                                pstate->raw_buf_done = false;
@@ -1230,7 +1241,7 @@ externalgettup_custom(FileScanDesc scan)
 */
 static HeapTuple
 externalgettup(FileScanDesc scan,
-                   ScanDirection dir __attribute__((unused)))
+                   ScanDirection dir __attribute__((unused)), 
ExternalSelectDesc desc)
 {
 
        CopyState       pstate = scan->fs_pstate;
@@ -1252,9 +1263,9 @@ externalgettup(FileScanDesc scan,
        }
 
        if (!custom)
-               return externalgettup_defined(scan); /* text/csv */
+               return externalgettup_defined(scan, desc); /* text/csv */
        else
-               return externalgettup_custom(scan);  /* custom   */
+               return externalgettup_custom(scan, desc);  /* custom   */
 
 }
 /*
@@ -1747,7 +1758,7 @@ close_external_source(FILE *dataSource, bool failOnError, 
const char *relname)
  * get a chunk of data from the external data file.
  */
 static int
-external_getdata(URL_FILE *extfile, CopyState pstate, int maxread)
+external_getdata(URL_FILE *extfile, CopyState pstate, int maxread, 
ExternalSelectDesc desc)
 {
        int                     bytesread = 0;
 
@@ -1759,7 +1770,7 @@ external_getdata(URL_FILE *extfile, CopyState pstate, int 
maxread)
        */
 
 
-       bytesread = url_fread((void *) pstate->raw_buf, 1, maxread, extfile, 
pstate);
+       bytesread = url_fread((void *) pstate->raw_buf, 1, maxread, extfile, 
pstate, desc);
 
        if (url_feof(extfile, bytesread))
        {

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/backend/access/external/pxfheaders.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/pxfheaders.c 
b/src/backend/access/external/pxfheaders.c
index 579e705..e653d30 100644
--- a/src/backend/access/external/pxfheaders.c
+++ b/src/backend/access/external/pxfheaders.c
@@ -34,6 +34,7 @@ static void add_location_options_httpheader(CHURL_HEADERS 
headers, GPHDUri *gphd
 static char* prepend_x_gp(const char* key);
 static void add_delegation_token_headers(CHURL_HEADERS headers, PxfInputData 
*inputData);
 static void add_remote_credentials(CHURL_HEADERS headers);
+static void add_projection_desc_httpheader(CHURL_HEADERS headers, 
ProjectionInfo *projInfo);
 
 /* 
  * Add key/value pairs to connection header. 
@@ -47,6 +48,7 @@ void build_http_header(PxfInputData *input)
        GPHDUri *gphduri = input->gphduri;
        Relation rel = input->rel;
        char *filterstr = input->filterstr;
+       ProjectionInfo *proj_info = input->proj_info;
        
        if (rel != NULL)
        {
@@ -60,6 +62,11 @@ void build_http_header(PxfInputData *input)
                add_tuple_desc_httpheader(headers, rel);
        }
        
+       if (proj_info != NULL)
+       {
+               add_projection_desc_httpheader(headers, proj_info);
+       }
+
        /* GP cluster configuration */
        external_set_env_vars(&ev, gphduri->uri, false, NULL, NULL, false, 0);
        
@@ -123,7 +130,8 @@ static void add_alignment_size_httpheader(CHURL_HEADERS 
headers)
  */
 static void add_tuple_desc_httpheader(CHURL_HEADERS headers, Relation rel)
 {      
-    char long_number[32];      
+    char long_number[sizeof(int32) * 8];
+
     StringInfoData formatter;  
     TupleDesc tuple;           
     initStringInfo(&formatter);
@@ -133,7 +141,7 @@ static void add_tuple_desc_httpheader(CHURL_HEADERS 
headers, Relation rel)
        
     /* Convert the number of attributes to a string */ 
     pg_ltoa(tuple->natts, long_number);        
-    churl_headers_append(headers, "X-GP-ATTRS", long_number);  
+    churl_headers_append(headers, "X-GP-ATTRS", long_number);
        
     /* Iterate attributes */   
     for (int i = 0; i < tuple->natts; ++i)             
@@ -158,6 +166,29 @@ static void add_tuple_desc_httpheader(CHURL_HEADERS 
headers, Relation rel)
        pfree(formatter.data);
 }
 
+static void add_projection_desc_httpheader(CHURL_HEADERS headers, 
ProjectionInfo *projInfo) {
+    int i;
+    char long_number[sizeof(int32) * 8];
+    int *varNumbers = projInfo->pi_varNumbers;
+    StringInfoData formatter;
+    initStringInfo(&formatter);
+
+    /* Convert the number of projection columns to a string */
+    pg_ltoa(list_length(projInfo->pi_targetlist), long_number);
+    churl_headers_append(headers, "X-GP-ATTRS-PROJ", long_number);
+
+    for(i = 0; i < list_length(projInfo->pi_targetlist); i++) {
+        int number = varNumbers[i] - 1;
+        pg_ltoa(number, long_number);
+        resetStringInfo(&formatter);
+        appendStringInfo(&formatter, "X-GP-ATTRS-PROJ-IDX");
+
+        churl_headers_append(headers, formatter.data,long_number);
+    }
+
+    pfree(formatter.data);
+}
+
 /* 
  * The options in the LOCATION statement of "create extenal table"
  * FRAGMENTER=HdfsDataFragmenter&ACCESSOR=SequenceFileAccessor... 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/backend/access/external/url.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/url.c 
b/src/backend/access/external/url.c
index 62a7ce0..97cdb92 100644
--- a/src/backend/access/external/url.c
+++ b/src/backend/access/external/url.c
@@ -86,7 +86,8 @@ static int32  InvokeExtProtocol(void          *ptr,
                                                                size_t          
nbytes, 
                                                                URL_FILE        
*file, 
                                                                CopyState       
pstate,
-                                                               bool            
last_call);
+                                                               bool            
last_call,
+                                                               
ExternalSelectDesc desc);
 void extract_http_domain(char* i_path, char* o_domain, int dlen);
 
 
@@ -1270,7 +1271,7 @@ url_fclose(URL_FILE *file, bool failOnError, const char 
*relname)
                        
                        /* last call. let the user close custom resources */
                        if(file->u.custom.protocol_udf)
-                               (void) InvokeExtProtocol(NULL, 0, file, NULL, 
true);
+                               (void) InvokeExtProtocol(NULL, 0, file, NULL, 
true, NULL);
 
                        /* now clean up everything not cleaned by user */
                        MemoryContextDelete(file->u.custom.protcxt);
@@ -1774,7 +1775,7 @@ static size_t curl_fwrite(char *buf, int nbytes, 
URL_FILE* file, CopyState pstat
 
 
 size_t
-url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState 
pstate)
+url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState 
pstate, ExternalSelectDesc desc)
 {
     size_t     want;
        int     n;
@@ -1821,7 +1822,7 @@ url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE 
*file, CopyState pstate
 
                case CFTYPE_CUSTOM:
                        
-                       want = (size_t) InvokeExtProtocol(ptr, nmemb * size, 
file, pstate, false);
+                       want = (size_t) InvokeExtProtocol(ptr, nmemb * size, 
file, pstate, false, desc);
                        break;
                                
                default: /* unknown or supported type */
@@ -1859,7 +1860,7 @@ url_fwrite(void *ptr, size_t size, size_t nmemb, URL_FILE 
*file, CopyState pstat
                
                case CFTYPE_CUSTOM:
                                                
-                       want = (size_t) InvokeExtProtocol(ptr, nmemb * size, 
file, pstate, false);
+                       want = (size_t) InvokeExtProtocol(ptr, nmemb * size, 
file, pstate, false, NULL);
                        break;
                        
                default: /* unknown or unsupported type */
@@ -2296,7 +2297,8 @@ InvokeExtProtocol(void                    *ptr,
                                  size_t                 nbytes, 
                                  URL_FILE              *file, 
                                  CopyState      pstate,
-                                 bool                   last_call)
+                                 bool                   last_call,
+                                 ExternalSelectDesc desc)
 {
        FunctionCallInfoData    fcinfo;
        ExtProtocolData*                extprotocol = 
file->u.custom.extprotocol;
@@ -2314,6 +2316,7 @@ InvokeExtProtocol(void                    *ptr,
        extprotocol->prot_maxbytes = nbytes;
        extprotocol->prot_scanquals = file->u.custom.scanquals;
        extprotocol->prot_last_call = last_call;
+       extprotocol->desc = desc;
        
        InitFunctionCallInfoData(/* FunctionCallInfoData */ fcinfo, 
                                                         /* FmgrInfo */ 
extprotocol_udf, 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/backend/executor/nodeExternalscan.c
----------------------------------------------------------------------
diff --git a/src/backend/executor/nodeExternalscan.c 
b/src/backend/executor/nodeExternalscan.c
index b19b25e..2831faa 100644
--- a/src/backend/executor/nodeExternalscan.c
+++ b/src/backend/executor/nodeExternalscan.c
@@ -66,6 +66,7 @@ ExternalNext(ExternalScanState *node)
        EState     *estate;
        ScanDirection direction;
        TupleTableSlot *slot;
+       ExternalSelectDesc externalSelectDesc;
 
        /*
         * get information from the estate and scan state
@@ -79,7 +80,8 @@ ExternalNext(ExternalScanState *node)
        /*
         * get the next tuple from the file access methods
         */
-       tuple = external_getnext(scandesc, direction);
+       externalSelectDesc = external_getnext_init(&(node->ss.ps));
+       tuple = external_getnext(scandesc, direction, externalSelectDesc);
 
        /*
         * save the tuple and the buffer returned to us by the access methods in
@@ -113,7 +115,7 @@ ExternalNext(ExternalScanState *node)
                        ExecEagerFreeExternalScan(node);
                }
        }
-
+       pfree(externalSelectDesc);
 
        return slot;
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/bin/gpfusion/gpbridgeapi.c
----------------------------------------------------------------------
diff --git a/src/bin/gpfusion/gpbridgeapi.c b/src/bin/gpfusion/gpbridgeapi.c
index 011b0fc..465c38d 100644
--- a/src/bin/gpfusion/gpbridgeapi.c
+++ b/src/bin/gpfusion/gpbridgeapi.c
@@ -68,6 +68,7 @@ void  free_token_resources(PxfInputData *inputData);
 Datum gpbridge_import(PG_FUNCTION_ARGS)
 {
        gpbridge_check_inside_extproto(fcinfo, "gpbridge_import");
+//     ExternalSelectDesc desc = EXTPROTOCOL_GET_SELECTDESC(fcinfo);
 
        if (gpbridge_last_call(fcinfo))
                PG_RETURN_INT32(gpbridge_cleanup(fcinfo));
@@ -181,6 +182,7 @@ void add_querydata_to_http_header(gphadoop_context* 
context, PG_FUNCTION_ARGS)
        inputData.gphduri = context->gphd_uri;
        inputData.rel = EXTPROTOCOL_GET_RELATION(fcinfo);
        inputData.filterstr = 
serializePxfFilterQuals(EXTPROTOCOL_GET_SCANQUALS(fcinfo));
+       inputData.proj_info = EXTPROTOCOL_GET_PROJINFO(fcinfo);
        add_delegation_token(&inputData);
        
        build_http_header(&inputData);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/include/access/extprotocol.h
----------------------------------------------------------------------
diff --git a/src/include/access/extprotocol.h b/src/include/access/extprotocol.h
index d70c844..ca7d492 100644
--- a/src/include/access/extprotocol.h
+++ b/src/include/access/extprotocol.h
@@ -33,6 +33,8 @@
 
 /* ------------------------- I/O function API -----------------------------*/
 
+struct ExternalSelectDescData;
+typedef struct ExternalSelectDescData *ExternalSelectDesc;
 /*
  * ExtProtocolData is the node type that is passed as fmgr "context" info
  * when a function is called by the External Table protocol manager.
@@ -47,6 +49,7 @@ typedef struct ExtProtocolData
        void               *prot_user_ctx;
        bool                    prot_last_call;
        List               *prot_scanquals;
+       ExternalSelectDesc desc;
 
 } ExtProtocolData;
 
@@ -61,13 +64,13 @@ typedef ExtProtocolData *ExtProtocol;
 #define EXTPROTOCOL_GET_DATALEN(fcinfo)    (((ExtProtocolData*) 
fcinfo->context)->prot_maxbytes)
 #define EXTPROTOCOL_GET_SCANQUALS(fcinfo)    (((ExtProtocolData*) 
fcinfo->context)->prot_scanquals)
 #define EXTPROTOCOL_GET_USER_CTX(fcinfo)   (((ExtProtocolData*) 
fcinfo->context)->prot_user_ctx)
+#define EXTPROTOCOL_GET_PROJINFO(fcinfo) (((ExtProtocolData*) 
fcinfo->context)->desc->projInfo)
 #define EXTPROTOCOL_IS_LAST_CALL(fcinfo)   (((ExtProtocolData*) 
fcinfo->context)->prot_last_call)
 
 #define EXTPROTOCOL_SET_LAST_CALL(fcinfo)  (((ExtProtocolData*) 
fcinfo->context)->prot_last_call = true)
 #define EXTPROTOCOL_SET_USER_CTX(fcinfo, p) \
        (((ExtProtocolData*) fcinfo->context)->prot_user_ctx = p)
 
-
 /* ------------------------- Validator function API 
-----------------------------*/
 
 typedef enum ValidatorDirection

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/include/access/fileam.h
----------------------------------------------------------------------
diff --git a/src/include/access/fileam.h b/src/include/access/fileam.h
index 145cb84..5a5f532 100644
--- a/src/include/access/fileam.h
+++ b/src/include/access/fileam.h
@@ -63,6 +63,17 @@ typedef struct ExternalInsertDescData
 
 typedef ExternalInsertDescData *ExternalInsertDesc;
 
+/*
+ * ExternalSelectDescData is used for storing state related
+ * to selecting data from an external table.
+ */
+typedef struct ExternalSelectDescData
+{
+       ProjectionInfo *projInfo;
+} ExternalSelectDescData;
+
+typedef ExternalSelectDescData *ExternalSelectDesc;
+
 typedef enum DataLineStatus
 {
        LINE_OK,
@@ -80,7 +91,8 @@ extern FileScanDesc external_beginscan(Relation relation, 
Index scanrelid,
 extern void external_rescan(FileScanDesc scan);
 extern void external_endscan(FileScanDesc scan);
 extern void external_stopscan(FileScanDesc scan);
-extern HeapTuple external_getnext(FileScanDesc scan, ScanDirection direction);
+extern ExternalSelectDesc external_getnext_init(PlanState *state);
+extern HeapTuple external_getnext(FileScanDesc scan, ScanDirection direction, 
ExternalSelectDesc desc);
 extern ExternalInsertDesc external_insert_init(Relation rel, int errAosegno);
 extern Oid external_insert(ExternalInsertDesc extInsertDesc, HeapTuple instup);
 extern void external_insert_finish(ExternalInsertDesc extInsertDesc);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/include/access/pxfheaders.h
----------------------------------------------------------------------
diff --git a/src/include/access/pxfheaders.h b/src/include/access/pxfheaders.h
index 13853ab..da3da7f 100644
--- a/src/include/access/pxfheaders.h
+++ b/src/include/access/pxfheaders.h
@@ -44,6 +44,7 @@ typedef struct sPxfInputData
        Relation                rel;
        char                    *filterstr;
        PxfHdfsToken    token;
+       ProjectionInfo  *proj_info;
 } PxfInputData;
 
 void build_http_header(PxfInputData *input);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/include/access/url.h
----------------------------------------------------------------------
diff --git a/src/include/access/url.h b/src/include/access/url.h
index d837b68..9de492d 100644
--- a/src/include/access/url.h
+++ b/src/include/access/url.h
@@ -159,7 +159,7 @@ extern URL_FILE *url_fopen(char *url, bool forwrite, 
extvar_t *ev, CopyState pst
 extern int url_fclose(URL_FILE *file, bool failOnError, const char *relname);
 extern bool url_feof(URL_FILE *file, int bytesread);
 extern bool url_ferror(URL_FILE *file, int bytesread, char *ebuf, int ebuflen);
-extern size_t url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, 
CopyState pstate);
+extern size_t url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, 
CopyState pstate, ExternalSelectDesc desc);
 extern size_t url_fwrite(void *ptr, size_t size, size_t nmemb, URL_FILE *file, 
CopyState pstate);
 extern void url_rewind(URL_FILE *file, const char *relname);
 extern void url_fflush(URL_FILE *file, CopyState pstate);

Reply via email to