Repository: incubator-hawq Updated Branches: refs/heads/master 8c69fa50f -> aa7a5d2ad
HAWQ-967. Extend Projection info to include filter attributes. Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/aa7a5d2a Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/aa7a5d2a Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/aa7a5d2a Branch: refs/heads/master Commit: aa7a5d2adc56c324f428978f5fc1864a1f00145b Parents: 8c69fa5 Author: Oleksandr Diachenko <[email protected]> Authored: Wed Aug 24 12:05:42 2016 -0700 Committer: Oleksandr Diachenko <[email protected]> Committed: Wed Aug 24 12:05:42 2016 -0700 ---------------------------------------------------------------------- src/backend/access/external/pxffilters.c | 106 ++++++++++++++++++++++++++ src/backend/access/external/pxfheaders.c | 26 ++++++- src/bin/gpfusion/gpbridgeapi.c | 1 + src/include/access/pxffilters.h | 1 + src/include/access/pxfheaders.h | 1 + 5 files changed, 131 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/aa7a5d2a/src/backend/access/external/pxffilters.c ---------------------------------------------------------------------- diff --git a/src/backend/access/external/pxffilters.c b/src/backend/access/external/pxffilters.c index f219738..405e6d7 100644 --- a/src/backend/access/external/pxffilters.c +++ b/src/backend/access/external/pxffilters.c @@ -38,6 +38,7 @@ static char* pxf_serialize_filter_list(List *filters); static bool opexpr_to_pxffilter(OpExpr *expr, PxfFilterDesc *filter); static bool supported_filter_type(Oid type); static void const_to_str(Const *constval, StringInfo buf); +static List* append_attr_from_var(Var* var, List* attrs); /* * All supported HAWQ operators, and their respective HFDS operator code. @@ -451,6 +452,51 @@ opexpr_to_pxffilter(OpExpr *expr, PxfFilterDesc *filter) return false; } +static List* +append_attr_from_var(Var* var, List* attrs) +{ + AttrNumber varattno = var->varattno; + /* system attr not supported */ + if (varattno > InvalidAttrNumber) + return lappend_int(attrs, varattno - 1); + + return attrs; +} + +static List* +get_attrs_from_expr(Expr *expr) +{ + Node *leftop = NULL; + Node *rightop = NULL; + List *attrs = NIL; + + if ((!expr)) + return attrs; + + if (IsA(expr, OpExpr)) + { + leftop = get_leftop(expr); + rightop = get_rightop(expr); + } else if (IsA(expr, ScalarArrayOpExpr)) + { + ScalarArrayOpExpr *saop = (ScalarArrayOpExpr *) expr; + leftop = (Node *) linitial(saop->args); + rightop = (Node *) lsecond(saop->args); + } + + if (IsA(leftop, Var)) + { + attrs = append_attr_from_var((Var *) leftop, attrs); + } + if (IsA(leftop, Const)) + { + attrs = append_attr_from_var((Var *) rightop, attrs); + } + + return attrs; + +} + /* * supported_filter_type * @@ -536,6 +582,7 @@ const_to_str(Const *constval, StringInfo buf) pfree(extval); } + /* * serializePxfFilterQuals * @@ -563,3 +610,62 @@ char *serializePxfFilterQuals(List *quals) return result; } + +/* + * Returns a list of attributes, extracted from quals. + * Supports AND, OR, NOT operations. + * Supports =, <, <=, >, >=, IS NULL, IS NOT NULL, BETWEEN, IN operators. + * List might contain duplicates. + * Caller should release memory once result is not needed. + */ +List* extractPxfAttributes(List* quals) +{ + + ListCell *lc = NULL; + List *attributes = NIL; + + if (list_length(quals) == 0) + return NIL; + + foreach (lc, quals) + { + Node *node = (Node *) lfirst(lc); + NodeTag tag = nodeTag(node); + + switch (tag) + { + case T_OpExpr: + case T_ScalarArrayOpExpr: + { + Expr* expr = (Expr *) node; + List *attrs = get_attrs_from_expr(expr); + attributes = list_concat(attributes, attrs); + break; + } + case T_BoolExpr: + { + BoolExpr* expr = (BoolExpr *) node; + List *inner_result = extractPxfAttributes(expr->args); + attributes = list_concat(attributes, inner_result); + break; + } + case T_NullTest: + { + NullTest* expr = (NullTest *) node; + attributes = append_attr_from_var((Var *) expr->arg, attributes); + break; + } + default: + /* + * tag is not supported, it's risk of having: + * 1) false-positive tuples + * 2) unable to join tables + * 3) etc + */ + elog(ERROR, "extractPxfAttributes: unsupported node tag %d, unable to extract attribute from qualifier", tag); + break; + } + } + + return attributes; +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/aa7a5d2a/src/backend/access/external/pxfheaders.c ---------------------------------------------------------------------- diff --git a/src/backend/access/external/pxfheaders.c b/src/backend/access/external/pxfheaders.c index 45fcc35..49dd966 100644 --- a/src/backend/access/external/pxfheaders.c +++ b/src/backend/access/external/pxfheaders.c @@ -26,6 +26,7 @@ #include "catalog/namespace.h" #include "catalog/pg_exttable.h" #include "access/pxfheaders.h" +#include "access/pxffilters.h" #include "utils/guc.h" static void add_alignment_size_httpheader(CHURL_HEADERS headers); @@ -34,7 +35,7 @@ static void add_location_options_httpheader(CHURL_HEADERS headers, GPHDUri *gphd static char* prepend_x_gp(const char* key); static void add_delegation_token_headers(CHURL_HEADERS headers, PxfInputData *inputData); static void add_remote_credentials(CHURL_HEADERS headers); -static void add_projection_desc_httpheader(CHURL_HEADERS headers, ProjectionInfo *projInfo); +static void add_projection_desc_httpheader(CHURL_HEADERS headers, ProjectionInfo *projInfo, List *qualsAttributes); /* * Add key/value pairs to connection header. @@ -64,7 +65,9 @@ void build_http_header(PxfInputData *input) if (proj_info != NULL && proj_info->pi_isVarList) { - add_projection_desc_httpheader(headers, proj_info); + List* qualsAttributes = extractPxfAttributes(input->quals); + + add_projection_desc_httpheader(headers, proj_info, qualsAttributes); } /* GP cluster configuration */ @@ -166,7 +169,7 @@ static void add_tuple_desc_httpheader(CHURL_HEADERS headers, Relation rel) pfree(formatter.data); } -static void add_projection_desc_httpheader(CHURL_HEADERS headers, ProjectionInfo *projInfo) { +static void add_projection_desc_httpheader(CHURL_HEADERS headers, ProjectionInfo *projInfo, List *qualsAttributes) { int i; char long_number[sizeof(int32) * 8]; int *varNumbers = projInfo->pi_varNumbers; @@ -174,7 +177,7 @@ static void add_projection_desc_httpheader(CHURL_HEADERS headers, ProjectionInfo initStringInfo(&formatter); /* Convert the number of projection columns to a string */ - pg_ltoa(list_length(projInfo->pi_targetlist), long_number); + pg_ltoa(list_length(projInfo->pi_targetlist) + list_length(qualsAttributes), long_number); churl_headers_append(headers, "X-GP-ATTRS-PROJ", long_number); for(i = 0; i < list_length(projInfo->pi_targetlist); i++) { @@ -186,6 +189,21 @@ static void add_projection_desc_httpheader(CHURL_HEADERS headers, ProjectionInfo churl_headers_append(headers, formatter.data,long_number); } + ListCell *attribute = NULL; + + foreach(attribute, qualsAttributes) + { + AttrNumber attrNumber = lfirst_int(attribute); + + pg_ltoa(attrNumber, long_number); + resetStringInfo(&formatter); + appendStringInfo(&formatter, "X-GP-ATTRS-PROJ-IDX"); + + churl_headers_append(headers, formatter.data,long_number); + } + + + list_free(qualsAttributes); pfree(formatter.data); } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/aa7a5d2a/src/bin/gpfusion/gpbridgeapi.c ---------------------------------------------------------------------- diff --git a/src/bin/gpfusion/gpbridgeapi.c b/src/bin/gpfusion/gpbridgeapi.c index 94c6b7d..2751b1b 100644 --- a/src/bin/gpfusion/gpbridgeapi.c +++ b/src/bin/gpfusion/gpbridgeapi.c @@ -181,6 +181,7 @@ void add_querydata_to_http_header(gphadoop_context* context, PG_FUNCTION_ARGS) inputData.headers = context->churl_headers; inputData.gphduri = context->gphd_uri; inputData.rel = EXTPROTOCOL_GET_RELATION(fcinfo); + inputData.quals = EXTPROTOCOL_GET_SCANQUALS(fcinfo); inputData.filterstr = serializePxfFilterQuals(EXTPROTOCOL_GET_SCANQUALS(fcinfo)); if (EXTPROTOCOL_GET_SELECTDESC(fcinfo)) inputData.proj_info = EXTPROTOCOL_GET_PROJINFO(fcinfo); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/aa7a5d2a/src/include/access/pxffilters.h ---------------------------------------------------------------------- diff --git a/src/include/access/pxffilters.h b/src/include/access/pxffilters.h index 358cd8b..1409e39 100644 --- a/src/include/access/pxffilters.h +++ b/src/include/access/pxffilters.h @@ -102,5 +102,6 @@ static inline bool pxfoperand_is_const(PxfOperand x) } char *serializePxfFilterQuals(List *quals); +List* extractPxfAttributes(List* quals); #endif // _PXF_FILTERS_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/aa7a5d2a/src/include/access/pxfheaders.h ---------------------------------------------------------------------- diff --git a/src/include/access/pxfheaders.h b/src/include/access/pxfheaders.h index da3da7f..410a077 100644 --- a/src/include/access/pxfheaders.h +++ b/src/include/access/pxfheaders.h @@ -45,6 +45,7 @@ typedef struct sPxfInputData char *filterstr; PxfHdfsToken token; ProjectionInfo *proj_info; + List *quals; } PxfInputData; void build_http_header(PxfInputData *input);
