This is an automated email from the ASF dual-hosted git repository. huor pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hawq.git
commit f62d3f4f2f9eb6a0c9e9ab15af3b7acd1eade971 Author: oushu1tuyu1 <[email protected]> AuthorDate: Mon Apr 29 11:35:24 2019 +0800 HAWQ-1710. Add ORC reader implement in hawq --- contrib/orc/orc.c | 875 ++++++++++++++++++++++++++++++++++++++++++++--- contrib/orc/orc_init.sql | 4 +- 2 files changed, 834 insertions(+), 45 deletions(-) diff --git a/contrib/orc/orc.c b/contrib/orc/orc.c index 0d03813..8109e62 100644 --- a/contrib/orc/orc.c +++ b/contrib/orc/orc.c @@ -32,6 +32,7 @@ #include "storage/cwrapper/orc-format-c.h" #include "storage/cwrapper/hdfs-file-system-c.h" #include "cdb/cdbvars.h" + #define ORC_TIMESTAMP_EPOCH_JDATE 2457024 /* == date2j(2015, 1, 1) */ #define MAX_ORC_ARRAY_DIMS 10000 #define ORC_NUMERIC_MAX_PRECISION 38 @@ -86,38 +87,40 @@ typedef struct ORCFormatUserData char **colNames; int *colDatatypes; int64_t *colDatatypeMods; - int numberOfColumns; + int32_t numberOfColumns; char **colRawValues; Datum *colValues; uint64_t *colValLength; bits8 **colValNullBitmap; int **colValDims; char **colAddresses; - bool *colIsNulls; bool *colToReads; - bool *colSpeedUpPossible; - bool *colSpeedUp; - - bool *nulls; - Datum *datums; - int reserved; - - TimestampType *colTimestamp; int nSplits; ORCFormatFileSplit *splits; + + // for write only + TimestampType *colTimestamp; } ORCFormatUserData; static FmgrInfo *get_orc_function(char *formatter_name, char *function_name); +static void get_scan_functions(FileScanDesc file_scan_desc); static void get_insert_functions(ExternalInsertDesc ext_insert_desc); +static void init_format_user_data_for_read(TupleDesc tup_desc, + ORCFormatUserData *user_data); static void init_format_user_data_for_write(TupleDesc tup_desc, ORCFormatUserData *user_data); static void build_options_in_json(List *fmt_opts_defelem, int encoding, char **json_str, TupleDesc tupDesc); static ORCFormatC *create_formatter_instance(List *fmt_opts_defelem, int encoding, int segno, TupleDesc tupDesc); +static void build_file_splits(Uri *uri, ScanState *scan_state, + ORCFormatUserData *user_data); +static void build_tuple_descrition_for_read(Plan *plan, Relation relation, + ORCFormatUserData *user_data); static void build_tuple_descrition_for_write(Relation relation, ORCFormatUserData *user_data); +static void orc_scan_error_callback(void *arg); static void orc_parse_format_string(CopyState pstate, char *fmtstr); static char *orc_strtokx2(const char *s, const char *whitespace, const char *delim, const char *quote, char escape, bool e_strings, @@ -139,7 +142,7 @@ Datum orc_validate_interfaces(PG_FUNCTION_ARGS) if (pg_strncasecmp(psv_interface->format_name, "orc", strlen("orc")) != 0) { ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), errmsg("orc_validate_interface : incorrect format name \'%s\'", psv_interface->format_name))); + (errcode(ERRCODE_SYNTAX_ERROR), errmsg("ORC: incorrect format name \'%s\'", psv_interface->format_name))); } PG_RETURN_VOID() ; @@ -259,7 +262,7 @@ Datum orc_validate_options(PG_FUNCTION_ARGS) && strncasecmp(key, "category", strlen("category"))) { ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), errmsg("Option \"%s\" for ORC table is invalid", key), errhint("Format options for ORC table must be either " "formatter, compresstype, bloomfilter or dicthreshold"), errOmitLocation(true))); + (errcode(ERRCODE_SYNTAX_ERROR), errmsg("Option \"%s\" for ORC table is invalid", key), errOmitLocation(true))); } sprintf((char * ) format_str + len, "%s '%s' ", key, val); @@ -301,7 +304,7 @@ Datum orc_validate_encodings(PG_FUNCTION_ARGS) if (strncasecmp(encoding_name, "utf8", strlen("utf8"))) { ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), errmsg("\"%s\" is not a valid encoding for ORC external table. " "Encoding for ORC external table must be UTF8.", encoding_name), errOmitLocation(true))); + (errcode(ERRCODE_SYNTAX_ERROR), errmsg("\"%s\" is not a valid encoding for ORC external table. ", encoding_name), errOmitLocation(true))); } PG_RETURN_VOID() ; @@ -320,22 +323,676 @@ Datum orc_validate_datatypes(PG_FUNCTION_ARGS) { int32_t datatype = (int32_t) (((Form_pg_attribute) (tup_desc->attrs[i]))->atttypid); + int4 typmod = ((Form_pg_attribute) (tup_desc->attrs[i]))->atttypmod; if (checkORCUnsupportedDataType(datatype)) { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("unsupported data types %d for columns of external ORC table is specified.", datatype), errOmitLocation(true))); } + if (HAWQ_TYPE_NUMERIC == datatype) + { + int4 tmp_typmod = typmod - VARHDRSZ; + int precision = (tmp_typmod >> 16) & 0xffff; + int scale = tmp_typmod & 0xffff; + if (precision < 1 || 38 < precision) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("ORC DECIMAL precision must be between 1 and 38"))); + if (scale == 0) + ereport(NOTICE, (errmsg("Using a scale of zero for ORC DECIMAL"))); + } + } + + PG_RETURN_VOID() ; +} + +/* + * FileScanDesc + * orc_beginscan(ExternalScan *extScan, + * ScanState *scanState, + * Relation relation, + * int formatterType, + * char *formatterName) + */ +Datum orc_beginscan(PG_FUNCTION_ARGS) +{ + PlugStorage ps = (PlugStorage) (fcinfo->context); + ExternalScan *ext_scan = ps->ps_ext_scan; + ScanState *scan_state = ps->ps_scan_state; + Relation relation = ps->ps_relation; + int formatterType = ps->ps_formatter_type; + char *formatterName = ps->ps_formatter_name; + + Index scan_rel_id = ext_scan->scan.scanrelid; + uint32 scan_counter = ext_scan->scancounter; + List *uri_list = ext_scan->uriList; + List *fmt_opts = ext_scan->fmtOpts; + int fmt_encoding = ext_scan->encoding; + List *scan_quals = ext_scan->scan.plan.qual; + + /* 1. Increment relation reference count while scanning relation */ + /* + * This is just to make really sure the relcache entry won't go away while + * the scan has a pointer to it. Caller should be holding the rel open + * anyway, so this is redundant in all normal scenarios... + */ + RelationIncrementReferenceCount(relation); + + /* 2. Allocate and initialize the select descriptor */ + FileScanDesc file_scan_desc = palloc(sizeof(FileScanDescData)); + file_scan_desc->fs_inited = false; + file_scan_desc->fs_ctup.t_data = NULL; + ItemPointerSetInvalid(&file_scan_desc->fs_ctup.t_self); + file_scan_desc->fs_cbuf = InvalidBuffer; + file_scan_desc->fs_rd = relation; + file_scan_desc->fs_scanrelid = scan_rel_id; + file_scan_desc->fs_scancounter = scan_counter; + file_scan_desc->fs_scanquals = scan_quals; + file_scan_desc->fs_noop = false; + file_scan_desc->fs_file = NULL; + file_scan_desc->fs_formatter = NULL; + file_scan_desc->fs_formatter_type = formatterType; + file_scan_desc->fs_formatter_name = formatterName; + + /* 2.1 Setup scan functions */ + get_scan_functions(file_scan_desc); + + /* 2.2 Get URI for the scan */ + /* + * get the external URI assigned to us. + * + * The URI assigned for this segment is normally in the uriList list + * at the index of this segment id. However, if we are executing on + * MASTER ONLY the (one and only) entry which is destined for the master + * will be at the first entry of the uriList list. + */ + char *uri_str = NULL; + int segindex = GetQEIndex(); + + Value *v = NULL; + + v = (Value *) list_nth(uri_list, 0); + + if (v->type == T_Null) + uri_str = NULL; + else + uri_str = (char *) strVal(v); + + /* + * If a uri is assigned to us - get a reference to it. Some executors + * don't have a uri to scan (if # of uri's < # of primary segdbs). + * in which case uri will be NULL. If that's the case for this + * segdb set to no-op. + */ + if (uri_str) + { + /* set external source (uri) */ + file_scan_desc->fs_uri = uri_str; + ereport(DEBUG3, (errmsg_internal("fs_uri (%d) is set as %s", segindex, uri_str))); + /* NOTE: we delay actually opening the data source until external_getnext() */ + } + else + { + /* segdb has no work to do. set to no-op */ + file_scan_desc->fs_noop = true; + file_scan_desc->fs_uri = NULL; + } + + /* 2.3 Allocate values and nulls structure */ + TupleDesc tup_desc = RelationGetDescr(relation); + file_scan_desc->fs_tupDesc = tup_desc; + file_scan_desc->attr = tup_desc->attrs; + file_scan_desc->num_phys_attrs = tup_desc->natts; + + file_scan_desc->values = (Datum *) palloc( + file_scan_desc->num_phys_attrs * sizeof(Datum)); + file_scan_desc->nulls = (bool *) palloc( + file_scan_desc->num_phys_attrs * sizeof(bool)); + + /* 2.5 Allocate and initialize the structure which track data parsing state */ + file_scan_desc->fs_pstate = (CopyStateData *) palloc0( + sizeof(CopyStateData)); + + /* 2.5.1 Initialize basic information */ + CopyState pstate = file_scan_desc->fs_pstate; + pstate->fe_eof = false; + pstate->eol_type = EOL_UNKNOWN; + pstate->eol_str = NULL; + pstate->cur_relname = RelationGetRelationName(relation); + pstate->cur_lineno = 0; + pstate->err_loc_type = ROWNUM_ORIGINAL; + pstate->cur_attname = NULL; + pstate->raw_buf_done = true; /* true so we will read data in first run */ + pstate->line_done = true; + pstate->bytesread = 0; + pstate->custom = false; + pstate->header_line = false; + pstate->fill_missing = false; + pstate->line_buf_converted = false; + pstate->raw_buf_index = 0; + pstate->processed = 0; + pstate->filename = uri_str; + pstate->copy_dest = COPY_EXTERNAL_SOURCE; + pstate->missing_bytes = 0; + pstate->csv_mode = false; + pstate->custom = true; + pstate->custom_formatter_func = NULL; + pstate->custom_formatter_name = NULL; + pstate->rel = relation; + + /* 2.5.2 Setup encoding information */ + /* + * Set up encoding conversion info. Even if the client and server + * encodings are the same, we must apply pg_client_to_server() to validate + * data in multibyte encodings. + * + * Each external table specifies the encoding of its external data. We will + * therefore set a client encoding and client-to-server conversion procedure + * in here (server-to-client in WET) and these will be used in the data + * conversion routines (in copy.c CopyReadLineXXX(), etc). + */ + Insist(PG_VALID_ENCODING(fmt_encoding)); + pstate->client_encoding = fmt_encoding; + Oid conversion_proc = FindDefaultConversionProc(fmt_encoding, + GetDatabaseEncoding()); + + if (OidIsValid(conversion_proc)) + { + /* conversion proc found */ + pstate->enc_conversion_proc = palloc(sizeof(FmgrInfo)); + fmgr_info(conversion_proc, pstate->enc_conversion_proc); + } + else + { + /* no conversion function (both encodings are probably the same) */ + pstate->enc_conversion_proc = NULL; + } + + pstate->need_transcoding = pstate->client_encoding != GetDatabaseEncoding(); + pstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY( + pstate->client_encoding); + + /* 2.5.3 Parse the data format options */ + char *format_str = pstrdup((char *) strVal(linitial(fmt_opts))); + + orc_parse_format_string(pstate, format_str); + + /* 2.5.4 Generate or convert list of attributes to process */ + pstate->attr_offsets = (int *) palloc(tup_desc->natts * sizeof(int)); + pstate->attnumlist = CopyGetAttnums(tup_desc, relation, NIL); + + /* 2.5.5 Convert FORCE NOT NULL name list to per-column flags, check validity */ + pstate->force_notnull_flags = (bool *) palloc0( + tup_desc->natts * sizeof(bool)); + if (pstate->force_notnull) + { + List *attnums; + ListCell *cur; + + attnums = CopyGetAttnums(tup_desc, relation, pstate->force_notnull); + + foreach(cur, attnums) + { + int attnum = lfirst_int(cur); + pstate->force_notnull_flags[attnum - 1] = true; + } + } + + /* 2.5.6 Take care of state that is RET specific */ + initStringInfo(&pstate->attribute_buf); + initStringInfo(&pstate->line_buf); + + /* Set up data buffer to hold a chunk of data */ + MemSet(pstate->raw_buf, ' ', RAW_BUF_SIZE * sizeof(char)); + pstate->raw_buf[RAW_BUF_SIZE] = '\0'; + + /* 2.5.7 Create temporary memory context for per row process */ + /* + * Create a temporary memory context that we can reset once per row to + * recover palloc'd memory. This avoids any problems with leaks inside + * datatype input or output routines, and should be faster than retail + * pfree's anyway. + */ + pstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext, + "ExtTableMemCxt", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + /* 2.6 Setup formatter information */ + file_scan_desc->fs_formatter = (FormatterData *) palloc0( + sizeof(FormatterData)); + initStringInfo(&file_scan_desc->fs_formatter->fmt_databuf); + file_scan_desc->fs_formatter->fmt_perrow_ctx = + file_scan_desc->fs_pstate->rowcontext; + file_scan_desc->fs_formatter->fmt_user_ctx = NULL; + + /* 2.7 Set up callback to identify error line number */ + file_scan_desc->errcontext.callback = orc_scan_error_callback; + file_scan_desc->errcontext.arg = (void *) file_scan_desc->fs_pstate; + file_scan_desc->errcontext.previous = error_context_stack; + + /* 3. Setup user data */ + /* 3.1 Initialize user data */ + ORCFormatUserData *user_data = palloc0(sizeof(ORCFormatUserData)); + init_format_user_data_for_read(tup_desc, user_data); + + /* 3.2 Create formatter instance */ + List *fmt_opts_defelem = pstate->custom_formatter_params; + user_data->fmt = create_formatter_instance(fmt_opts_defelem, fmt_encoding, + ps->ps_segno, tup_desc); + + /* 3.3 Build file splits */ + Uri *uri = ParseExternalTableUri(uri_str); + + if (enable_secure_filesystem) + { + char *token = find_filesystem_credential_with_uri(uri_str); + SetToken(uri_str, token); + } + file_scan_desc->fs_ps_scan_state = scan_state; /* for orc rescan */ + build_file_splits(uri, scan_state, user_data); + + FreeExternalTableUri(uri); + + /* 3.4 Build tuple description */ + Plan *plan = &(ext_scan->scan.plan); + file_scan_desc->fs_ps_plan = plan; + build_tuple_descrition_for_read(plan, relation, user_data); + + /* 3.5 Save user data */ + file_scan_desc->fs_ps_user_data = (void *) user_data; + /* 4. Begin scan with the formatter */ + ORCFormatBeginORCFormatC(user_data->fmt, user_data->splits, + user_data->nSplits, user_data->colToReads, user_data->colNames, + user_data->colDatatypes, user_data->colDatatypeMods, + user_data->numberOfColumns); + + ORCFormatCatchedError *e = ORCFormatGetErrorORCFormatC(user_data->fmt); + if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION) + { + ereport(ERROR, (errcode(e->errCode), errmsg("ORC:%s", e->errMessage))); + } + + /* 5. Save file_scan_desc */ + ps->ps_file_scan_desc = file_scan_desc; + + PG_RETURN_POINTER(file_scan_desc); +} + +/* + * ExternalSelectDesc + * orc_getnext_init(PlanState *planState, + * ExternalScanState *extScanState) + */ +Datum orc_getnext_init(PG_FUNCTION_ARGS) +{ + PlugStorage ps = (PlugStorage) (fcinfo->context); + PlanState *plan_state = ps->ps_plan_state; + ExternalScanState *ext_scan_state = ps->ps_ext_scan_state; + + ExternalSelectDesc ext_select_desc = NULL; + /* + ExternalSelectDesc ext_select_desc = (ExternalSelectDesc)palloc0( + sizeof(ExternalSelectDescData)); + + Plan *rootPlan = NULL; + + if (plan_state != NULL) + { + ext_select_desc->projInfo = plan_state->ps_ProjInfo; + + // If we have an agg type then our parent is an Agg node + rootPlan = plan_state->state->es_plannedstmt->planTree; + if (IsA(rootPlan, Agg) && ext_scan_state->parent_agg_type) + { + ext_select_desc->agg_type = ext_scan_state->parent_agg_type; + } + } + */ + + ps->ps_ext_select_desc = ext_select_desc; + + PG_RETURN_POINTER(ext_select_desc); +} + +Datum orc_getnext(PG_FUNCTION_ARGS) { + PlugStorage ps = (PlugStorage)(fcinfo->context); + FileScanDesc fsd = ps->ps_file_scan_desc; + ORCFormatUserData *user_data = (ORCFormatUserData *)(fsd->fs_ps_user_data); + TupleTableSlot *slot = ps->ps_tuple_table_slot; + bool *nulls = slot_get_isnull(slot); + memset(nulls, true, user_data->numberOfColumns); + + bool res = ORCFormatNextORCFormatC(user_data->fmt, user_data->colRawValues, + user_data->colValLength, nulls); + if (res) { + for (int32_t i = 0; i < user_data->numberOfColumns; ++i) { + // Column not to read or column is null + if (nulls[i]) continue; + + switch (fsd->attr[i]->atttypid) { + case HAWQ_TYPE_BOOL: { + user_data->colValues[i] = + BoolGetDatum(*(bool *)(user_data->colRawValues[i])); + break; + } + case HAWQ_TYPE_INT2: { + user_data->colValues[i] = + Int16GetDatum(*(int16_t *)(user_data->colRawValues[i])); + break; + } + case HAWQ_TYPE_INT4: { + user_data->colValues[i] = + Int32GetDatum(*(int32_t *)(user_data->colRawValues[i])); + break; + } + case HAWQ_TYPE_INT8: + case HAWQ_TYPE_TIME: + case HAWQ_TYPE_TIMESTAMP: + case HAWQ_TYPE_TIMESTAMPTZ: { + user_data->colValues[i] = + Int64GetDatum(*(int64_t *)(user_data->colRawValues[i])); + break; + } + case HAWQ_TYPE_FLOAT4: { + user_data->colValues[i] = + Float4GetDatum(*(float *)(user_data->colRawValues[i])); + break; + } + case HAWQ_TYPE_FLOAT8: { + user_data->colValues[i] = + Float8GetDatum(*(double *)(user_data->colRawValues[i])); + break; + } + case HAWQ_TYPE_VARCHAR: + case HAWQ_TYPE_TEXT: + case HAWQ_TYPE_BPCHAR: + case HAWQ_TYPE_BYTE: + case HAWQ_TYPE_NUMERIC: { + SET_VARSIZE((struct varlena *)(user_data->colRawValues[i]), + user_data->colValLength[i]); + user_data->colValues[i] = PointerGetDatum(user_data->colRawValues[i]); + break; + } + case HAWQ_TYPE_DATE: { + user_data->colValues[i] = + Int32GetDatum(*(int32_t *)(user_data->colRawValues[i]) - + POSTGRES_EPOCH_JDATE + UNIX_EPOCH_JDATE); + break; + } + default: { + ereport(ERROR, (errmsg_internal("ORC:%d", fsd->attr[i]->atttypid))); + + break; + } + } + } + + ps->ps_has_tuple = true; + slot->PRIVATE_tts_values = user_data->colValues; + TupSetVirtualTupleNValid(slot, user_data->numberOfColumns); + PG_RETURN_BOOL(true); + } + + ORCFormatCatchedError *e = ORCFormatGetErrorORCFormatC(user_data->fmt); + if (e->errCode == ERRCODE_SUCCESSFUL_COMPLETION) { + ORCFormatEndORCFormatC(user_data->fmt); + e = ORCFormatGetErrorORCFormatC(user_data->fmt); + if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION) { + ereport(ERROR, (errcode(e->errCode), errmsg("ORC:%s", e->errMessage))); + } + + ORCFormatFreeORCFormatC(&(user_data->fmt)); + + pfree(user_data->colRawValues); + pfree(user_data->colValues); + pfree(user_data->colToReads); + pfree(user_data->colValLength); + if (user_data->splits != NULL) { + for (int i = 0; i < user_data->nSplits; ++i) { + pfree(user_data->splits[i].fileName); + } + pfree(user_data->splits); + } + for (int i = 0; i < user_data->numberOfColumns; ++i) { + pfree(user_data->colNames[i]); + } + + pfree(user_data->colNames); + pfree(user_data->colDatatypes); + pfree(user_data); + fsd->fs_ps_user_data = NULL; + + ps->ps_has_tuple = false; + slot->PRIVATE_tts_values = NULL; + ExecClearTuple(slot); + } else { + ereport(ERROR, (errcode(e->errCode), errmsg("ORC:%s", e->errMessage))); + } + PG_RETURN_BOOL(false); +} + +/* + * void + * orc_rescan(FileScanDesc scan) + */ +Datum orc_rescan(PG_FUNCTION_ARGS) +{ + PlugStorage ps = (PlugStorage) (fcinfo->context); + FileScanDesc fsd = ps->ps_file_scan_desc; + Relation relation = fsd->fs_rd; + TupleDesc tup_desc = RelationGetDescr(relation); + + ORCFormatUserData *user_data = (ORCFormatUserData *) (fsd->fs_ps_user_data); + + if (user_data == NULL) + { + /* 1 Initialize user data */ + user_data = palloc0(sizeof(ORCFormatUserData)); + init_format_user_data_for_read(fsd->fs_tupDesc, user_data); + + /* 2 Create formatter instance */ + List *fmt_opts_defelem = fsd->fs_pstate->custom_formatter_params; + int fmt_encoding = fsd->fs_pstate->client_encoding; + user_data->fmt = create_formatter_instance(fmt_opts_defelem, + fmt_encoding, ps->ps_segno, tup_desc); + + /* 3 Build file splits */ + Uri *uri = ParseExternalTableUri(fsd->fs_uri); + build_file_splits(uri, fsd->fs_ps_scan_state, user_data); + + /* 4 Build tuple description */ + Plan *plan = fsd->fs_ps_plan; + build_tuple_descrition_for_read(plan, fsd->fs_rd, user_data); + + /* 5 Save user data */ + fsd->fs_ps_user_data = (void *) user_data; + + if (enable_secure_filesystem) + { + char *token = find_filesystem_credential_with_uri(fsd->fs_uri); + SetToken(fsd->fs_uri, token); + } + FreeExternalTableUri(uri); + /* 6 Begin scan with the formatter */ + ORCFormatBeginORCFormatC(user_data->fmt, user_data->splits, + user_data->nSplits, user_data->colToReads, user_data->colNames, + user_data->colDatatypes, user_data->colDatatypeMods, + user_data->numberOfColumns); + + ORCFormatCatchedError *e = ORCFormatGetErrorORCFormatC(user_data->fmt); + if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION) + { + ereport(ERROR, (errcode(e->errCode), errmsg("ORC:%s", e->errMessage))); + } + } else { + ORCFormatRescanORCFormatC(user_data->fmt); + ORCFormatCatchedError *e = ORCFormatGetErrorORCFormatC(user_data->fmt); + if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION) + { + ereport(ERROR, (errcode(e->errCode), errmsg("ORC:%s", e->errMessage))); + } + } + + /* reset some parse state variables */ + fsd->fs_pstate->fe_eof = false; + fsd->fs_pstate->cur_lineno = 0; + fsd->fs_pstate->cur_attname = NULL; + fsd->fs_pstate->raw_buf_done = true; /* true so we will read data in first run */ + fsd->fs_pstate->line_done = true; + fsd->fs_pstate->bytesread = 0; + + PG_RETURN_VOID() ; +} + +/* + * void + * orc_endscan(FileScanDesc scan) + */ +Datum orc_endscan(PG_FUNCTION_ARGS) +{ + PlugStorage ps = (PlugStorage) (fcinfo->context); + FileScanDesc fsd = ps->ps_file_scan_desc; + + /* Clean up scan descriptor */ + char *relname = pstrdup(RelationGetRelationName(fsd->fs_rd)); + + if (fsd->fs_pstate != NULL) + { /* - * TODO(wshao): additional check for orc decimal type - * orc format currently does not support decimal precision larger than 38 + * decrement relation reference count and free scan descriptor storage */ + RelationDecrementReferenceCount(fsd->fs_rd); } + if (fsd->values) + { + pfree(fsd->values); + fsd->values = NULL; + } + if (fsd->nulls) + { + pfree(fsd->nulls); + fsd->nulls = NULL; + } + + if (fsd->fs_pstate != NULL && fsd->fs_pstate->rowcontext != NULL) + { + /* + * delete the row context + */ + MemoryContextDelete(fsd->fs_pstate->rowcontext); + fsd->fs_pstate->rowcontext = NULL; + } + + if (fsd->fs_formatter) + { + /* TODO: check if this space is automatically freed. + * if not, then see what about freeing the user context */ + if (fsd->fs_formatter->fmt_databuf.data) + pfree(fsd->fs_formatter->fmt_databuf.data); + pfree(fsd->fs_formatter); + fsd->fs_formatter = NULL; + } + + /* + * free formatter information + */ + if (fsd->fs_formatter_name) + { + pfree(fsd->fs_formatter_name); + fsd->fs_formatter_name = NULL; + } + + /* + * free parse state memory + */ + if (fsd->fs_pstate != NULL) + { + if (fsd->fs_pstate->attribute_buf.data) + pfree(fsd->fs_pstate->attribute_buf.data); + if (fsd->fs_pstate->line_buf.data) + pfree(fsd->fs_pstate->line_buf.data); + if (fsd->fs_pstate->attr_offsets) + pfree(fsd->fs_pstate->attr_offsets); + if (fsd->fs_pstate->force_quote_flags) + pfree(fsd->fs_pstate->force_quote_flags); + if (fsd->fs_pstate->force_notnull_flags) + pfree(fsd->fs_pstate->force_notnull_flags); + + pfree(fsd->fs_pstate); + fsd->fs_pstate = NULL; + } + + /* clean up error context */ + error_context_stack = fsd->errcontext.previous; + + pfree(relname); + PG_RETURN_VOID() ; } /* + * void + * orc_stopscan(FileScanDesc scan) + */ +Datum orc_stopscan(PG_FUNCTION_ARGS) +{ + PlugStorage ps = (PlugStorage)(fcinfo->context); + FileScanDesc fsd = ps->ps_file_scan_desc; + ORCFormatUserData *user_data = (ORCFormatUserData *)(fsd->fs_ps_user_data); + TupleTableSlot *tts = ps->ps_tuple_table_slot; + + if (!user_data) PG_RETURN_VOID(); + + /* If there is no error caught, it should be an end of reading split */ + ORCFormatCatchedError *e = ORCFormatGetErrorORCFormatC(user_data->fmt); + if (e->errCode == ERRCODE_SUCCESSFUL_COMPLETION) { + ORCFormatEndORCFormatC(user_data->fmt); + e = ORCFormatGetErrorORCFormatC(user_data->fmt); + if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION) { + ereport(LOG, (errcode(e->errCode), errmsg("ORC:%s", e->errMessage))); + } + + ORCFormatFreeORCFormatC(&(user_data->fmt)); + + pfree(user_data->colRawValues); + pfree(user_data->colValues); + if (user_data->colToReads) { + pfree(user_data->colToReads); + user_data->colToReads = NULL; + } + pfree(user_data->colValLength); + if (user_data->splits != NULL) { + for (int i = 0; i < user_data->nSplits; ++i) { + pfree(user_data->splits[i].fileName); + } + pfree(user_data->splits); + } + for (int i = 0; i < user_data->numberOfColumns; ++i) { + pfree(user_data->colNames[i]); + } + + pfree(user_data->colNames); + pfree(user_data->colDatatypes); + pfree(user_data); + fsd->fs_ps_user_data = NULL; + + /* form empty tuple */ + ps->ps_has_tuple = false; + + tts->PRIVATE_tts_values = NULL; + tts->PRIVATE_tts_isnull = NULL; + ExecClearTuple(tts); + } else { + ereport(ERROR, (errcode(e->errCode), errmsg("ORC:%s", e->errMessage))); + } + + PG_RETURN_VOID(); +} + +/* * ExternalInsertDesc * orc_insert_init(Relation relation, * int formatterType, @@ -582,8 +1239,7 @@ Datum orc_insert_init(PG_FUNCTION_ARGS) ORCFormatCatchedError *e = ORCFormatGetErrorORCFormatC(user_data->fmt); if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION) { - elog(ERROR, "ORC: failed to begin insert: %s (%d)", - e->errMessage, e->errCode); + ereport(ERROR, (errcode(e->errCode), errmsg("ORC:%s", e->errMessage))); } /* 4. Save the result */ @@ -606,7 +1262,7 @@ Datum orc_insert(PG_FUNCTION_ARGS) ORCFormatUserData *user_data = (ORCFormatUserData *) (eid->ext_ps_user_data); user_data->colValues = slot_get_values(tts); - user_data->colIsNulls = slot_get_isnull(tts); + bool *nulls = slot_get_isnull(tts); static bool DUMMY_BOOL = true; static int8_t DUMMY_INT8 = 0; @@ -647,7 +1303,7 @@ Datum orc_insert(PG_FUNCTION_ARGS) user_data->colRawValues[i] = NULL; user_data->colValNullBitmap[i] = NULL; - if (user_data->colIsNulls[i]) + if (nulls[i]) { if (dataType == HAWQ_TYPE_CHAR) { @@ -692,7 +1348,8 @@ Datum orc_insert(PG_FUNCTION_ARGS) { user_data->colRawValues[i] = (char *) (&DUMMY_TIME); } - else if (dataType == HAWQ_TYPE_TIMESTAMP) + else if (dataType == HAWQ_TYPE_TIMESTAMP + || dataType == HAWQ_TYPE_TIMESTAMPTZ) { user_data->colRawValues[i] = (char *) (&DUMMY_TIMESTAMP); } @@ -718,12 +1375,11 @@ Datum orc_insert(PG_FUNCTION_ARGS) } else if (dataType == HAWQ_TYPE_INVALID) { - elog(ERROR, "HAWQ data type with id %d is invalid", dataType); + ereport(ERROR, (errmsg_internal("HAWQ data type with id %d is invalid", dataType))); } else { - elog( - ERROR, "HAWQ data type with id %d is not supported yet", dataType); + ereport(ERROR, (errmsg_internal("HAWQ data type with id %d is not supported yet", dataType))); } continue; @@ -736,14 +1392,18 @@ Datum orc_insert(PG_FUNCTION_ARGS) { user_data->colRawValues[i] = (char *) (&(user_data->colValues[i])); } - else if (dataType == HAWQ_TYPE_TIMESTAMP) + else if (dataType == HAWQ_TYPE_TIMESTAMP || dataType == HAWQ_TYPE_TIMESTAMPTZ) { int64_t *timestamp = (int64_t *) (&(user_data->colValues[i])); user_data->colTimestamp[i].second = *timestamp / 1000000 + (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * 60 * 60 * 24; user_data->colTimestamp[i].nanosecond = *timestamp % 1000000 * 1000; - if (user_data->colTimestamp[i].nanosecond < 0) + int64_t days = user_data->colTimestamp[i].second / 60 / 60 / 24; + if (user_data->colTimestamp[i].nanosecond < 0 && + (days > POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE || days < 0)) user_data->colTimestamp[i].nanosecond += 1000000000; + if(user_data->colTimestamp[i].second<0&&user_data->colTimestamp[i].nanosecond) + user_data->colTimestamp[i].second-=1; user_data->colRawValues[i] = (char *) (&(user_data->colTimestamp[i])); } @@ -791,8 +1451,8 @@ Datum orc_insert(PG_FUNCTION_ARGS) // Now we only support 1 dimension array if (ARR_NDIM(arr) > 1) { - elog(ERROR, "Now we only support 1 dimension array in orc format," - " your array dimension is %d", ARR_NDIM(arr)); + ereport(ERROR, (errmsg_internal("Now we only support 1 dimension array in orc format," + " your array dimension is %d", ARR_NDIM(arr)))); } else if (ARR_NDIM(arr) == 1) { @@ -805,12 +1465,12 @@ Datum orc_insert(PG_FUNCTION_ARGS) } else if (dataType == HAWQ_TYPE_INVALID) { - elog(ERROR, "HAWQ data type with id %d is invalid", dataType); + ereport(ERROR, (errmsg_internal("HAWQ data type with id %d is invalid", dataType))); } else { - elog( - ERROR, "HAWQ data type with id %d is not supported yet", dataType); + ereport(ERROR, (errmsg_internal("HAWQ data type with id %d is not supported yet", dataType))); + } } @@ -818,20 +1478,19 @@ Datum orc_insert(PG_FUNCTION_ARGS) ORCFormatInsertORCFormatC(user_data->fmt, user_data->colDatatypes, user_data->colRawValues, user_data->colValLength, user_data->colValNullBitmap, user_data->colValDims, - user_data->colIsNulls); + nulls); ORCFormatCatchedError *e = ORCFormatGetErrorORCFormatC(user_data->fmt); if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION) { - elog(ERROR, "orc_insert: failed to insert: %s(%d)", - e->errMessage, e->errCode); + ereport(ERROR,(errcode(e->errCode),errmsg("ORC::%s", e->errMessage))); } for (int i = 0; i < user_data->numberOfColumns; ++i) { int dataType = (int) (tupdesc->attrs[i]->atttypid); - if (user_data->colIsNulls[i]) + if (nulls[i]) { continue; } @@ -869,8 +1528,7 @@ Datum orc_insert_finish(PG_FUNCTION_ARGS) ORCFormatCatchedError *e = ORCFormatGetErrorORCFormatC(user_data->fmt); if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION) { - elog(ERROR, "ORC: failed to finish insert: %s(%d)", - e->errMessage, e->errCode); + ereport(ERROR, (errcode(e->errCode), errmsg("ORC:%s", e->errMessage))); } ORCFormatFreeORCFormatC(&(user_data->fmt)); @@ -878,7 +1536,6 @@ Datum orc_insert_finish(PG_FUNCTION_ARGS) pfree(user_data->colDatatypes); pfree(user_data->colRawValues); pfree(user_data->colValLength); - pfree(user_data->colAddresses); for (int i = 0; i < user_data->numberOfColumns; ++i) { pfree(user_data->colNames[i]); @@ -913,13 +1570,33 @@ static FmgrInfo *get_orc_function(char *formatter_name, char *function_name) } else { - elog(ERROR, "%s_%s function was not found for pluggable storage", - formatter_name, function_name); + ereport(ERROR, (errmsg_internal("%s_%s function was not found for pluggable storage", + formatter_name, function_name))); } return procInfo; } +static void get_scan_functions(FileScanDesc file_scan_desc) +{ + file_scan_desc->fs_ps_scan_funcs.beginscan = get_orc_function("orc", + "beginscan"); + + file_scan_desc->fs_ps_scan_funcs.getnext_init = get_orc_function("orc", + "getnext_init"); + + file_scan_desc->fs_ps_scan_funcs.getnext = get_orc_function("orc", + "getnext"); + + file_scan_desc->fs_ps_scan_funcs.rescan = get_orc_function("orc", "rescan"); + + file_scan_desc->fs_ps_scan_funcs.endscan = get_orc_function("orc", + "endscan"); + + file_scan_desc->fs_ps_scan_funcs.stopscan = get_orc_function("orc", + "stopscan"); +} + static void get_insert_functions(ExternalInsertDesc ext_insert_desc) { ext_insert_desc->ext_ps_insert_funcs.insert_init = get_orc_function("orc", @@ -932,6 +1609,33 @@ static void get_insert_functions(ExternalInsertDesc ext_insert_desc) "insert_finish"); } +static void init_format_user_data_for_read(TupleDesc tup_desc, + ORCFormatUserData *user_data) +{ + user_data->numberOfColumns = tup_desc->natts; + user_data->colNames = palloc0(sizeof(char *) * user_data->numberOfColumns); + user_data->colDatatypes = palloc0(sizeof(int) * user_data->numberOfColumns); + user_data->colDatatypeMods = palloc0( + sizeof(int64_t) * user_data->numberOfColumns); + user_data->colValues = palloc0(sizeof(Datum) * user_data->numberOfColumns); + user_data->colRawValues = palloc0( + sizeof(char *) * user_data->numberOfColumns); + user_data->colValLength = palloc0( + sizeof(uint64_t) * user_data->numberOfColumns); + user_data->colValNullBitmap = palloc0( + sizeof(bits8 *) * user_data->numberOfColumns); + + for (int i = 0; i < user_data->numberOfColumns; i++) + { + user_data->colNames[i] = NULL; + user_data->colValues[i] = NULL; + user_data->colRawValues[i] = NULL; + user_data->colValLength[i] = 0; + user_data->colValNullBitmap[i] = (bits8 *) palloc0( + sizeof(bits8) * MAX_ORC_ARRAY_DIMS); + } +} + static void init_format_user_data_for_write(TupleDesc tup_desc, ORCFormatUserData *user_data) { @@ -947,8 +1651,6 @@ static void init_format_user_data_for_write(TupleDesc tup_desc, user_data->colValNullBitmap = palloc0( sizeof(bits8 *) * user_data->numberOfColumns); user_data->colValDims = palloc0(sizeof(int *) * user_data->numberOfColumns); - user_data->colAddresses = palloc0( - sizeof(char *) * user_data->numberOfColumns); user_data->colTimestamp = palloc0( sizeof(TimestampType) * user_data->numberOfColumns); } @@ -1016,7 +1718,7 @@ static void build_options_in_json(List *fmt_opts_defelem, int encoding, strcpy(*json_str, str); json_object_put(opt_json_object); - elog(DEBUG3, "formatter options are %s", *json_str); + ereport(DEBUG3, (errmsg_internal("formatter options are %s", *json_str))); } } @@ -1037,6 +1739,80 @@ static ORCFormatC *create_formatter_instance(List *fmt_opts_defelem, return orc_format_c; } +static void build_file_splits(Uri *uri, ScanState *scan_state, + ORCFormatUserData *user_data) +{ + if (scan_state) + { + user_data->nSplits = list_length(scan_state->splits); + user_data->splits = palloc0( + sizeof(ORCFormatFileSplit) * user_data->nSplits); + ListCell *cell = NULL; + int i = 0; + foreach(cell, scan_state->splits) + { + FileSplit origFS = (FileSplit) lfirst(cell); + user_data->splits[i].len = origFS->lengths; + user_data->splits[i].start = origFS->offsets; + if (uri->protocol == URI_HIVE) + { + uri->hostname = + (scan_state->hivehost) ? + pstrdup(scan_state->hivehost) : NULL; + uri->port = scan_state->hiveport; + } + /* build file path containing host address */ + int fileNameLen = sizeof("hdfs") - 1 + /* *** This is a bad imp. */ + sizeof("://") - 1 + strlen(uri->hostname) + sizeof(':') + + sizeof("65535") - 1 + strlen(origFS->ext_file_uri_string) + + sizeof('\0'); + + user_data->splits[i].fileName = palloc(fileNameLen * sizeof(char)); + sprintf(user_data->splits[i].fileName, "%s://%s:%d%s", "hdfs", + uri->hostname, uri->port, origFS->ext_file_uri_string); + ereport(LOG, (errmsg_internal("fileinformation:%s",user_data->splits[i].fileName))); + + i++; + } + } + else + { + user_data->nSplits = 0; + user_data->splits = NULL; + } +} + +static void build_tuple_descrition_for_read(Plan *plan, Relation relation, + ORCFormatUserData *user_data) +{ + user_data->colToReads = palloc0(sizeof(bool) * user_data->numberOfColumns); + + for (int i = 0; i < user_data->numberOfColumns; ++i) + { + user_data->colToReads[i] = plan ? false : true; + + /* 64 is the name type length */ + user_data->colNames[i] = palloc(sizeof(char) * 64); + + strcpy(user_data->colNames[i], + relation->rd_att->attrs[i]->attname.data); + + int data_type = (int) (relation->rd_att->attrs[i]->atttypid); + user_data->colDatatypes[i] = map_hawq_type_to_common_plan(data_type); + user_data->colDatatypeMods[i] = relation->rd_att->attrs[i]->atttypmod; + } + + if (plan) + { + /* calculate columns to read for seqscan */ + GetNeededColumnsForScan((Node *) plan->targetlist, + user_data->colToReads, user_data->numberOfColumns); + + GetNeededColumnsForScan((Node *) plan->qual, user_data->colToReads, + user_data->numberOfColumns); + } +} + static void build_tuple_descrition_for_write(Relation relation, ORCFormatUserData *user_data) { @@ -1051,10 +1827,23 @@ static void build_tuple_descrition_for_write(Relation relation, user_data->colDatatypes[i] = map_hawq_type_to_common_plan( (int) (relation->rd_att->attrs[i]->atttypid)); - user_data->colDatatypeMods[i] = relation->rd_att->attrs[i]->atttypmod; + if (user_data->colDatatypes[i] == CHARID && + relation->rd_att->attrs[i]->atttypmod == -1) { + user_data->colDatatypeMods[i] = + strlen(relation->rd_att->attrs[i]->attname.data) + VARHDRSZ; + } else { + user_data->colDatatypeMods[i] = relation->rd_att->attrs[i]->atttypmod; + } } } +static void orc_scan_error_callback(void *arg) +{ + CopyState cstate = (CopyState) arg; + + errcontext("External table %s", cstate->cur_relname); +} + static void orc_parse_format_string(CopyState pstate, char *fmtstr) { char *token; diff --git a/contrib/orc/orc_init.sql b/contrib/orc/orc_init.sql index cae2318..e3773ef 100644 --- a/contrib/orc/orc_init.sql +++ b/contrib/orc/orc_init.sql @@ -21,7 +21,7 @@ LANGUAGE C STABLE; CREATE OR REPLACE FUNCTION pg_catalog.orc_validate_datatypes() RETURNS void AS '$libdir/orc.so', 'orc_validate_datatypes' LANGUAGE C STABLE; -/* + CREATE OR REPLACE FUNCTION pg_catalog.orc_beginscan() RETURNS bytea AS '$libdir/orc.so', 'orc_beginscan' LANGUAGE C STABLE; @@ -45,7 +45,7 @@ LANGUAGE C STABLE; CREATE OR REPLACE FUNCTION pg_catalog.orc_stopscan() RETURNS void AS '$libdir/orc.so', 'orc_stopscan' LANGUAGE C STABLE; -*/ + CREATE OR REPLACE FUNCTION pg_catalog.orc_insert_init() RETURNS bytea AS '$libdir/orc.so', 'orc_insert_init'
