Github user huor commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1384#discussion_r208434847 --- Diff: src/backend/cdb/cdbdatalocality.c --- @@ -1579,7 +1700,242 @@ static void ParquetGetSegFileDataLocation(Relation relation, return; } +static void InvokeHDFSProtocolBlockLocation(Oid procOid, + List *locs, + List **blockLocations) +{ + ExtProtocolValidatorData *validator_data; + FmgrInfo *validator_udf; + FunctionCallInfoData fcinfo; + + validator_data = (ExtProtocolValidatorData *) + palloc0 (sizeof(ExtProtocolValidatorData)); + validator_udf = palloc(sizeof(FmgrInfo)); + fmgr_info(procOid, validator_udf); + + validator_data->type = T_ExtProtocolValidatorData; + validator_data->url_list = locs; + validator_data->format_opts = NULL; + validator_data->errmsg = NULL; + validator_data->direction = EXT_VALIDATE_READ; + validator_data->action = EXT_VALID_ACT_GETBLKLOC; + + InitFunctionCallInfoData(/* FunctionCallInfoData */ fcinfo, + /* FmgrInfo */ validator_udf, + /* nArgs */ 0, + /* Call Context */ (Node *) validator_data, + /* ResultSetInfo */ NULL); + + /* invoke validator. if this function returns - validation passed */ + FunctionCallInvoke(&fcinfo); + + ExtProtocolBlockLocationData *bls = + (ExtProtocolBlockLocationData *)(fcinfo.resultinfo); + /* debug output block location. */ + if (bls != NULL) + { + ListCell *c; + foreach(c, bls->files) + { + blocklocation_file *blf = (blocklocation_file *)(lfirst(c)); + elog(DEBUG3, "DEBUG LOCATION for %s with %d blocks", + blf->file_uri, blf->block_num); + for ( int i = 0 ; i < blf->block_num ; ++i ) + { + BlockLocation *pbl = &(blf->locations[i]); + elog(DEBUG3, "DEBUG LOCATION for block %d : %d, " + INT64_FORMAT ", " INT64_FORMAT ", %d", + i, + pbl->corrupt, pbl->length, pbl->offset, + pbl->numOfNodes); + for ( int j = 0 ; j < pbl->numOfNodes ; ++j ) + { + elog(DEBUG3, "DEBUG LOCATION for block %d : %s, %s, %s", + i, + pbl->hosts[j], pbl->names[j], + pbl->topologyPaths[j]); + } + } + } + } + elog(DEBUG3, "after invoking get block location API"); + + /* get location data from fcinfo.resultinfo. */ + if (bls != NULL) + { + Assert(bls->type == T_ExtProtocolBlockLocationData); + while(list_length(bls->files) > 0) + { + void *v = lfirst(list_head(bls->files)); + bls->files = list_delete_first(bls->files); + *blockLocations = lappend(*blockLocations, v); + } + } + pfree(validator_data); + pfree(validator_udf); +} + +Oid +LookupCustomProtocolBlockLocationFunc(char *protoname) +{ + List* funcname = NIL; + Oid procOid = InvalidOid; + Oid argList[1]; + Oid returnOid; + + char* new_func_name = (char *)palloc0(strlen(protoname) + 16); + sprintf(new_func_name, "%s_blocklocation", protoname); + funcname = lappend(funcname, makeString(new_func_name)); + returnOid = VOIDOID; + procOid = LookupFuncName(funcname, 0, argList, true); + + if (!OidIsValid(procOid)) + ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION), + errmsg("protocol function %s was not found.", + new_func_name), + errhint("Create it with CREATE FUNCTION."), + errOmitLocation(true))); + + /* check return type matches */ + if (get_func_rettype(procOid) != returnOid) + ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("protocol function %s has an incorrect return type", + new_func_name), + errOmitLocation(true))); + + /* check allowed volatility */ + if (func_volatile(procOid) != PROVOLATILE_STABLE) + ereport(ERROR, (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("protocol function %s is not declared STABLE.", + new_func_name), + errOmitLocation(true))); + pfree(new_func_name); + + return procOid; +} + +static void ExternalGetHdfsFileDataLocation( + Relation relation, + split_to_segment_mapping_context *context, + int64 splitsize, + Relation_Data *rel_data, + int* allblocks) { + ExtTableEntry *ext_entry = GetExtTableEntry(rel_data->relid); + Assert(ext_entry->locations != NIL); + int64 total_size = 0; + int segno = 1; + + /* + * Step 1. get external HDFS location from URI. + */ + char* first_uri_str = (char *) strVal(lfirst(list_head(ext_entry->locations))); + /* We must have at least one location. */ + Assert(first_uri_str != NULL); + Uri* uri = ParseExternalTableUri(first_uri_str); + bool isHdfs = false; + if (uri != NULL && is_hdfs_protocol(uri)) { + isHdfs = true; + } + Assert(isHdfs); /* Currently, we accept HDFS only. */ + + /* + * Step 2. Get function to call for getting location information. This work + * is done by validator function registered for this external protocol. + */ + Oid procOid = InvalidOid; + if (isHdfs) { + procOid = LookupCustomProtocolBlockLocationFunc("hdfs"); + } + else + { + Assert(false); + } + + /* + * Step 3. Call validator to get location data. + */ + + /* Prepare function call parameter by passing into location string. This is + * only called at dispatcher side. */ + List *bls = NULL; /* Block locations */ + if (OidIsValid(procOid) && Gp_role == GP_ROLE_DISPATCH) + { + InvokeHDFSProtocolBlockLocation(procOid, ext_entry->locations, &bls); + } + + /* + * Step 4. Build data location info for optimization after this call. + */ + + /* Go through each files */ + ListCell *cbl = NULL; + foreach(cbl, bls) + { + blocklocation_file *f = (blocklocation_file *)lfirst(cbl); + BlockLocation *locations = f->locations; + int block_num = f->block_num; + int64 logic_len = 0; + *allblocks += block_num; + if ((locations != NULL) && (block_num > 0)) { + // calculate length for one specific file + for (int j = 0; j < block_num; ++j) { + logic_len += locations[j].length; + // locations[j].lowerBoundInc = NULL; + // locations[j].upperBoundExc = NULL; + } + total_size += logic_len; + + Block_Host_Index * host_index = update_data_dist_stat(context, + locations, block_num); + + Relation_File *file = (Relation_File *) palloc(sizeof(Relation_File)); + if (atoi(strrchr(f->file_uri, '/') + 1) > 0) + file->segno = atoi(strrchr(f->file_uri, '/') + 1); + else + file->segno = segno++; + file->block_num = block_num; + file->locations = locations; + file->hostIDs = host_index; + file->logic_len = logic_len; + + // do the split logic + int realSplitNum = 0; + int split_num = file->block_num; + int64 offset = 0; + File_Split *splits = (File_Split *) palloc(sizeof(File_Split) * split_num); + while (realSplitNum < split_num) { + splits[realSplitNum].host = -1; + splits[realSplitNum].is_local_read = true; + splits[realSplitNum].offset = offset; + splits[realSplitNum].length = file->locations[realSplitNum].length; + splits[realSplitNum].logiceof = logic_len; + splits[realSplitNum].ext_file_uri = pstrdup(f->file_uri); + splits[realSplitNum].fs_lb_len = 0; + splits[realSplitNum].fs_ub_len = 0; + // file->locations[realSplitNum].lowerBoundInc = NULL; + splits[realSplitNum].lower_bound_inc = NULL; --- End diff -- Remove
---