Repository: incubator-hawq Updated Branches: refs/heads/master 2f95286e0 -> 24d4d967e
HAWQ-927. Pass ProjectionInfo data to PXF This commit makes the necessary modifications to the HAWQ side of the codebase to add a list of indices of projected columns Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/24d4d967 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/24d4d967 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/24d4d967 Branch: refs/heads/master Commit: 24d4d967e767fc640ad6ab3c91b2ce26bb96de66 Parents: 2f95286 Author: Kavinder Dhaliwal <[email protected]> Authored: Mon Jul 11 18:20:16 2016 -0700 Committer: Kavinder Dhaliwal <[email protected]> Committed: Wed Jul 20 16:36:08 2016 -0700 ---------------------------------------------------------------------- src/backend/access/external/fileam.c | 37 +++++++++++++++++---------- src/backend/access/external/pxfheaders.c | 35 +++++++++++++++++++++++-- src/backend/access/external/url.c | 15 ++++++----- src/backend/executor/nodeExternalscan.c | 6 +++-- src/bin/gpfusion/gpbridgeapi.c | 2 ++ src/include/access/extprotocol.h | 5 +++- src/include/access/fileam.h | 14 +++++++++- src/include/access/pxfheaders.h | 1 + src/include/access/url.h | 2 +- 9 files changed, 91 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/backend/access/external/fileam.c ---------------------------------------------------------------------- diff --git a/src/backend/access/external/fileam.c b/src/backend/access/external/fileam.c index 645e6dc..70a115a 100644 --- a/src/backend/access/external/fileam.c +++ b/src/backend/access/external/fileam.c @@ -79,7 +79,7 @@ #include "cdb/cdbutil.h" #include "cdb/cdbvars.h" -static HeapTuple externalgettup(FileScanDesc scan, ScanDirection dir); +static HeapTuple externalgettup(FileScanDesc scan, ScanDirection dir, ExternalSelectDesc desc); static void InitParseState(CopyState pstate, Relation relation, Datum* values, bool* nulls, bool writable, List *fmtOpts, char fmtType, @@ -97,7 +97,7 @@ static void FunctionCallPrepareFormatter(FunctionCallInfoData* fcinfo, static void open_external_readable_source(FileScanDesc scan); static void open_external_writable_source(ExternalInsertDesc extInsertDesc); -static int external_getdata(URL_FILE *extfile, CopyState pstate, int maxread); +static int external_getdata(URL_FILE *extfile, CopyState pstate, int maxread, ExternalSelectDesc desc); static void external_senddata(URL_FILE *extfile, CopyState pstate); static void external_scan_error_callback(void *arg); void readHeaderLine(CopyState pstate); @@ -454,6 +454,17 @@ external_stopscan(FileScanDesc scan) } } +/* ---------------- + * external_getnext_init - prepare ExternalSelectDesc struct before external_getnext + * ---------------- + */ +ExternalSelectDesc +external_getnext_init(PlanState *state) { + ExternalSelectDesc desc = (ExternalSelectDesc) palloc0(sizeof(ExternalSelectDescData)); + if (state != NULL) + desc->projInfo = state->ps_ProjInfo; + return desc; +} /* ---------------------------------------------------------------- * external_getnext @@ -462,7 +473,7 @@ external_stopscan(FileScanDesc scan) * ---------------------------------------------------------------- */ HeapTuple -external_getnext(FileScanDesc scan, ScanDirection direction) +external_getnext(FileScanDesc scan, ScanDirection direction, ExternalSelectDesc desc) { HeapTuple tuple; @@ -486,7 +497,7 @@ external_getnext(FileScanDesc scan, ScanDirection direction) /* Note: no locking manipulations needed */ FILEDEBUG_1; - tuple = externalgettup(scan, direction); + tuple = externalgettup(scan, direction, desc); if (tuple == NULL) @@ -969,7 +980,7 @@ static DataLineStatus parse_next_line(FileScanDesc scan) } static HeapTuple -externalgettup_defined(FileScanDesc scan) +externalgettup_defined(FileScanDesc scan, ExternalSelectDesc desc) { HeapTuple tuple = NULL; CopyState pstate = scan->fs_pstate; @@ -981,7 +992,7 @@ externalgettup_defined(FileScanDesc scan) /* need to fill our buffer with data? */ if (pstate->raw_buf_done) { - pstate->bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE); + pstate->bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc); pstate->begloc = pstate->raw_buf; pstate->raw_buf_done = (pstate->bytesread==0); pstate->raw_buf_index = 0; @@ -1072,7 +1083,7 @@ externalgettup_defined(FileScanDesc scan) } static HeapTuple -externalgettup_custom(FileScanDesc scan) +externalgettup_custom(FileScanDesc scan, ExternalSelectDesc desc) { HeapTuple tuple; CopyState pstate = scan->fs_pstate; @@ -1088,7 +1099,7 @@ externalgettup_custom(FileScanDesc scan) /* need to fill our buffer with data? */ if (pstate->raw_buf_done) { - int bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE); + int bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc); if ( bytesread > 0 ) appendBinaryStringInfo(&formatter->fmt_databuf, pstate->raw_buf, bytesread); pstate->raw_buf_done = false; @@ -1230,7 +1241,7 @@ externalgettup_custom(FileScanDesc scan) */ static HeapTuple externalgettup(FileScanDesc scan, - ScanDirection dir __attribute__((unused))) + ScanDirection dir __attribute__((unused)), ExternalSelectDesc desc) { CopyState pstate = scan->fs_pstate; @@ -1252,9 +1263,9 @@ externalgettup(FileScanDesc scan, } if (!custom) - return externalgettup_defined(scan); /* text/csv */ + return externalgettup_defined(scan, desc); /* text/csv */ else - return externalgettup_custom(scan); /* custom */ + return externalgettup_custom(scan, desc); /* custom */ } /* @@ -1747,7 +1758,7 @@ close_external_source(FILE *dataSource, bool failOnError, const char *relname) * get a chunk of data from the external data file. */ static int -external_getdata(URL_FILE *extfile, CopyState pstate, int maxread) +external_getdata(URL_FILE *extfile, CopyState pstate, int maxread, ExternalSelectDesc desc) { int bytesread = 0; @@ -1759,7 +1770,7 @@ external_getdata(URL_FILE *extfile, CopyState pstate, int maxread) */ - bytesread = url_fread((void *) pstate->raw_buf, 1, maxread, extfile, pstate); + bytesread = url_fread((void *) pstate->raw_buf, 1, maxread, extfile, pstate, desc); if (url_feof(extfile, bytesread)) { http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/backend/access/external/pxfheaders.c ---------------------------------------------------------------------- diff --git a/src/backend/access/external/pxfheaders.c b/src/backend/access/external/pxfheaders.c index 579e705..e653d30 100644 --- a/src/backend/access/external/pxfheaders.c +++ b/src/backend/access/external/pxfheaders.c @@ -34,6 +34,7 @@ static void add_location_options_httpheader(CHURL_HEADERS headers, GPHDUri *gphd static char* prepend_x_gp(const char* key); static void add_delegation_token_headers(CHURL_HEADERS headers, PxfInputData *inputData); static void add_remote_credentials(CHURL_HEADERS headers); +static void add_projection_desc_httpheader(CHURL_HEADERS headers, ProjectionInfo *projInfo); /* * Add key/value pairs to connection header. @@ -47,6 +48,7 @@ void build_http_header(PxfInputData *input) GPHDUri *gphduri = input->gphduri; Relation rel = input->rel; char *filterstr = input->filterstr; + ProjectionInfo *proj_info = input->proj_info; if (rel != NULL) { @@ -60,6 +62,11 @@ void build_http_header(PxfInputData *input) add_tuple_desc_httpheader(headers, rel); } + if (proj_info != NULL) + { + add_projection_desc_httpheader(headers, proj_info); + } + /* GP cluster configuration */ external_set_env_vars(&ev, gphduri->uri, false, NULL, NULL, false, 0); @@ -123,7 +130,8 @@ static void add_alignment_size_httpheader(CHURL_HEADERS headers) */ static void add_tuple_desc_httpheader(CHURL_HEADERS headers, Relation rel) { - char long_number[32]; + char long_number[sizeof(int32) * 8]; + StringInfoData formatter; TupleDesc tuple; initStringInfo(&formatter); @@ -133,7 +141,7 @@ static void add_tuple_desc_httpheader(CHURL_HEADERS headers, Relation rel) /* Convert the number of attributes to a string */ pg_ltoa(tuple->natts, long_number); - churl_headers_append(headers, "X-GP-ATTRS", long_number); + churl_headers_append(headers, "X-GP-ATTRS", long_number); /* Iterate attributes */ for (int i = 0; i < tuple->natts; ++i) @@ -158,6 +166,29 @@ static void add_tuple_desc_httpheader(CHURL_HEADERS headers, Relation rel) pfree(formatter.data); } +static void add_projection_desc_httpheader(CHURL_HEADERS headers, ProjectionInfo *projInfo) { + int i; + char long_number[sizeof(int32) * 8]; + int *varNumbers = projInfo->pi_varNumbers; + StringInfoData formatter; + initStringInfo(&formatter); + + /* Convert the number of projection columns to a string */ + pg_ltoa(list_length(projInfo->pi_targetlist), long_number); + churl_headers_append(headers, "X-GP-ATTRS-PROJ", long_number); + + for(i = 0; i < list_length(projInfo->pi_targetlist); i++) { + int number = varNumbers[i] - 1; + pg_ltoa(number, long_number); + resetStringInfo(&formatter); + appendStringInfo(&formatter, "X-GP-ATTRS-PROJ-IDX"); + + churl_headers_append(headers, formatter.data,long_number); + } + + pfree(formatter.data); +} + /* * The options in the LOCATION statement of "create extenal table" * FRAGMENTER=HdfsDataFragmenter&ACCESSOR=SequenceFileAccessor... http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/backend/access/external/url.c ---------------------------------------------------------------------- diff --git a/src/backend/access/external/url.c b/src/backend/access/external/url.c index 62a7ce0..97cdb92 100644 --- a/src/backend/access/external/url.c +++ b/src/backend/access/external/url.c @@ -86,7 +86,8 @@ static int32 InvokeExtProtocol(void *ptr, size_t nbytes, URL_FILE *file, CopyState pstate, - bool last_call); + bool last_call, + ExternalSelectDesc desc); void extract_http_domain(char* i_path, char* o_domain, int dlen); @@ -1270,7 +1271,7 @@ url_fclose(URL_FILE *file, bool failOnError, const char *relname) /* last call. let the user close custom resources */ if(file->u.custom.protocol_udf) - (void) InvokeExtProtocol(NULL, 0, file, NULL, true); + (void) InvokeExtProtocol(NULL, 0, file, NULL, true, NULL); /* now clean up everything not cleaned by user */ MemoryContextDelete(file->u.custom.protcxt); @@ -1774,7 +1775,7 @@ static size_t curl_fwrite(char *buf, int nbytes, URL_FILE* file, CopyState pstat size_t -url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate) +url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate, ExternalSelectDesc desc) { size_t want; int n; @@ -1821,7 +1822,7 @@ url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate case CFTYPE_CUSTOM: - want = (size_t) InvokeExtProtocol(ptr, nmemb * size, file, pstate, false); + want = (size_t) InvokeExtProtocol(ptr, nmemb * size, file, pstate, false, desc); break; default: /* unknown or supported type */ @@ -1859,7 +1860,7 @@ url_fwrite(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstat case CFTYPE_CUSTOM: - want = (size_t) InvokeExtProtocol(ptr, nmemb * size, file, pstate, false); + want = (size_t) InvokeExtProtocol(ptr, nmemb * size, file, pstate, false, NULL); break; default: /* unknown or unsupported type */ @@ -2296,7 +2297,8 @@ InvokeExtProtocol(void *ptr, size_t nbytes, URL_FILE *file, CopyState pstate, - bool last_call) + bool last_call, + ExternalSelectDesc desc) { FunctionCallInfoData fcinfo; ExtProtocolData* extprotocol = file->u.custom.extprotocol; @@ -2314,6 +2316,7 @@ InvokeExtProtocol(void *ptr, extprotocol->prot_maxbytes = nbytes; extprotocol->prot_scanquals = file->u.custom.scanquals; extprotocol->prot_last_call = last_call; + extprotocol->desc = desc; InitFunctionCallInfoData(/* FunctionCallInfoData */ fcinfo, /* FmgrInfo */ extprotocol_udf, http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/backend/executor/nodeExternalscan.c ---------------------------------------------------------------------- diff --git a/src/backend/executor/nodeExternalscan.c b/src/backend/executor/nodeExternalscan.c index b19b25e..2831faa 100644 --- a/src/backend/executor/nodeExternalscan.c +++ b/src/backend/executor/nodeExternalscan.c @@ -66,6 +66,7 @@ ExternalNext(ExternalScanState *node) EState *estate; ScanDirection direction; TupleTableSlot *slot; + ExternalSelectDesc externalSelectDesc; /* * get information from the estate and scan state @@ -79,7 +80,8 @@ ExternalNext(ExternalScanState *node) /* * get the next tuple from the file access methods */ - tuple = external_getnext(scandesc, direction); + externalSelectDesc = external_getnext_init(&(node->ss.ps)); + tuple = external_getnext(scandesc, direction, externalSelectDesc); /* * save the tuple and the buffer returned to us by the access methods in @@ -113,7 +115,7 @@ ExternalNext(ExternalScanState *node) ExecEagerFreeExternalScan(node); } } - + pfree(externalSelectDesc); return slot; } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/bin/gpfusion/gpbridgeapi.c ---------------------------------------------------------------------- diff --git a/src/bin/gpfusion/gpbridgeapi.c b/src/bin/gpfusion/gpbridgeapi.c index 011b0fc..465c38d 100644 --- a/src/bin/gpfusion/gpbridgeapi.c +++ b/src/bin/gpfusion/gpbridgeapi.c @@ -68,6 +68,7 @@ void free_token_resources(PxfInputData *inputData); Datum gpbridge_import(PG_FUNCTION_ARGS) { gpbridge_check_inside_extproto(fcinfo, "gpbridge_import"); +// ExternalSelectDesc desc = EXTPROTOCOL_GET_SELECTDESC(fcinfo); if (gpbridge_last_call(fcinfo)) PG_RETURN_INT32(gpbridge_cleanup(fcinfo)); @@ -181,6 +182,7 @@ void add_querydata_to_http_header(gphadoop_context* context, PG_FUNCTION_ARGS) inputData.gphduri = context->gphd_uri; inputData.rel = EXTPROTOCOL_GET_RELATION(fcinfo); inputData.filterstr = serializePxfFilterQuals(EXTPROTOCOL_GET_SCANQUALS(fcinfo)); + inputData.proj_info = EXTPROTOCOL_GET_PROJINFO(fcinfo); add_delegation_token(&inputData); build_http_header(&inputData); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/include/access/extprotocol.h ---------------------------------------------------------------------- diff --git a/src/include/access/extprotocol.h b/src/include/access/extprotocol.h index d70c844..ca7d492 100644 --- a/src/include/access/extprotocol.h +++ b/src/include/access/extprotocol.h @@ -33,6 +33,8 @@ /* ------------------------- I/O function API -----------------------------*/ +struct ExternalSelectDescData; +typedef struct ExternalSelectDescData *ExternalSelectDesc; /* * ExtProtocolData is the node type that is passed as fmgr "context" info * when a function is called by the External Table protocol manager. @@ -47,6 +49,7 @@ typedef struct ExtProtocolData void *prot_user_ctx; bool prot_last_call; List *prot_scanquals; + ExternalSelectDesc desc; } ExtProtocolData; @@ -61,13 +64,13 @@ typedef ExtProtocolData *ExtProtocol; #define EXTPROTOCOL_GET_DATALEN(fcinfo) (((ExtProtocolData*) fcinfo->context)->prot_maxbytes) #define EXTPROTOCOL_GET_SCANQUALS(fcinfo) (((ExtProtocolData*) fcinfo->context)->prot_scanquals) #define EXTPROTOCOL_GET_USER_CTX(fcinfo) (((ExtProtocolData*) fcinfo->context)->prot_user_ctx) +#define EXTPROTOCOL_GET_PROJINFO(fcinfo) (((ExtProtocolData*) fcinfo->context)->desc->projInfo) #define EXTPROTOCOL_IS_LAST_CALL(fcinfo) (((ExtProtocolData*) fcinfo->context)->prot_last_call) #define EXTPROTOCOL_SET_LAST_CALL(fcinfo) (((ExtProtocolData*) fcinfo->context)->prot_last_call = true) #define EXTPROTOCOL_SET_USER_CTX(fcinfo, p) \ (((ExtProtocolData*) fcinfo->context)->prot_user_ctx = p) - /* ------------------------- Validator function API -----------------------------*/ typedef enum ValidatorDirection http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/include/access/fileam.h ---------------------------------------------------------------------- diff --git a/src/include/access/fileam.h b/src/include/access/fileam.h index 145cb84..5a5f532 100644 --- a/src/include/access/fileam.h +++ b/src/include/access/fileam.h @@ -63,6 +63,17 @@ typedef struct ExternalInsertDescData typedef ExternalInsertDescData *ExternalInsertDesc; +/* + * ExternalSelectDescData is used for storing state related + * to selecting data from an external table. + */ +typedef struct ExternalSelectDescData +{ + ProjectionInfo *projInfo; +} ExternalSelectDescData; + +typedef ExternalSelectDescData *ExternalSelectDesc; + typedef enum DataLineStatus { LINE_OK, @@ -80,7 +91,8 @@ extern FileScanDesc external_beginscan(Relation relation, Index scanrelid, extern void external_rescan(FileScanDesc scan); extern void external_endscan(FileScanDesc scan); extern void external_stopscan(FileScanDesc scan); -extern HeapTuple external_getnext(FileScanDesc scan, ScanDirection direction); +extern ExternalSelectDesc external_getnext_init(PlanState *state); +extern HeapTuple external_getnext(FileScanDesc scan, ScanDirection direction, ExternalSelectDesc desc); extern ExternalInsertDesc external_insert_init(Relation rel, int errAosegno); extern Oid external_insert(ExternalInsertDesc extInsertDesc, HeapTuple instup); extern void external_insert_finish(ExternalInsertDesc extInsertDesc); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/include/access/pxfheaders.h ---------------------------------------------------------------------- diff --git a/src/include/access/pxfheaders.h b/src/include/access/pxfheaders.h index 13853ab..da3da7f 100644 --- a/src/include/access/pxfheaders.h +++ b/src/include/access/pxfheaders.h @@ -44,6 +44,7 @@ typedef struct sPxfInputData Relation rel; char *filterstr; PxfHdfsToken token; + ProjectionInfo *proj_info; } PxfInputData; void build_http_header(PxfInputData *input); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/include/access/url.h ---------------------------------------------------------------------- diff --git a/src/include/access/url.h b/src/include/access/url.h index d837b68..9de492d 100644 --- a/src/include/access/url.h +++ b/src/include/access/url.h @@ -159,7 +159,7 @@ extern URL_FILE *url_fopen(char *url, bool forwrite, extvar_t *ev, CopyState pst extern int url_fclose(URL_FILE *file, bool failOnError, const char *relname); extern bool url_feof(URL_FILE *file, int bytesread); extern bool url_ferror(URL_FILE *file, int bytesread, char *ebuf, int ebuflen); -extern size_t url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate); +extern size_t url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate, ExternalSelectDesc desc); extern size_t url_fwrite(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate); extern void url_rewind(URL_FILE *file, const char *relname); extern void url_fflush(URL_FILE *file, CopyState pstate);
