Github user huor commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1384#discussion_r208434847
  
    --- Diff: src/backend/cdb/cdbdatalocality.c ---
    @@ -1579,7 +1700,242 @@ static void ParquetGetSegFileDataLocation(Relation 
relation,
        return;
     }
     
    +static void InvokeHDFSProtocolBlockLocation(Oid    procOid,
    +                                            List  *locs,
    +                                            List **blockLocations)
    +{
    +   ExtProtocolValidatorData   *validator_data;
    +   FmgrInfo                                   *validator_udf;
    +   FunctionCallInfoData            fcinfo;
    +
    +   validator_data = (ExtProtocolValidatorData *)
    +                                    palloc0 
(sizeof(ExtProtocolValidatorData));
    +   validator_udf = palloc(sizeof(FmgrInfo));
    +   fmgr_info(procOid, validator_udf);
    +
    +   validator_data->type            = T_ExtProtocolValidatorData;
    +   validator_data->url_list        = locs;
    +   validator_data->format_opts = NULL;
    +   validator_data->errmsg          = NULL;
    +   validator_data->direction       = EXT_VALIDATE_READ;
    +   validator_data->action          = EXT_VALID_ACT_GETBLKLOC;
    +
    +   InitFunctionCallInfoData(/* FunctionCallInfoData */ fcinfo,
    +                                                    /* FmgrInfo */ 
validator_udf,
    +                                                    /* nArgs */ 0,
    +                                                    /* Call Context */ 
(Node *) validator_data,
    +                                                    /* ResultSetInfo */ 
NULL);
    +
    +   /* invoke validator. if this function returns - validation passed */
    +   FunctionCallInvoke(&fcinfo);
    +
    +   ExtProtocolBlockLocationData *bls =
    +           (ExtProtocolBlockLocationData *)(fcinfo.resultinfo);
    +   /* debug output block location. */
    +   if (bls != NULL)
    +   {
    +           ListCell *c;
    +           foreach(c, bls->files)
    +           {
    +                   blocklocation_file *blf = (blocklocation_file 
*)(lfirst(c));
    +                   elog(DEBUG3, "DEBUG LOCATION for %s with %d blocks",
    +                                        blf->file_uri, blf->block_num);
    +                   for ( int i = 0 ; i < blf->block_num ; ++i )
    +                   {
    +                           BlockLocation *pbl = &(blf->locations[i]);
    +                           elog(DEBUG3, "DEBUG LOCATION for block %d : %d, 
"
    +                                                INT64_FORMAT ", " 
INT64_FORMAT ", %d",
    +                                                i,
    +                                                pbl->corrupt, pbl->length, 
pbl->offset,
    +                                                    pbl->numOfNodes);
    +                           for ( int j = 0 ; j < pbl->numOfNodes ; ++j )
    +                           {
    +                                   elog(DEBUG3, "DEBUG LOCATION for block 
%d : %s, %s, %s",
    +                                                        i,
    +                                                        pbl->hosts[j], 
pbl->names[j],
    +                                                            
pbl->topologyPaths[j]);
    +                           }
    +                   }
    +           }
    +   }
     
    +   elog(DEBUG3, "after invoking get block location API");
    +
    +   /* get location data from fcinfo.resultinfo. */
    +   if (bls != NULL)
    +   {
    +           Assert(bls->type == T_ExtProtocolBlockLocationData);
    +           while(list_length(bls->files) > 0)
    +           {
    +                   void *v = lfirst(list_head(bls->files));
    +                   bls->files = list_delete_first(bls->files);
    +                   *blockLocations = lappend(*blockLocations, v);
    +           }
    +   }
    +   pfree(validator_data);
    +   pfree(validator_udf);
    +}
    +
    +Oid
    +LookupCustomProtocolBlockLocationFunc(char *protoname)
    +{
    +   List*   funcname        = NIL;
    +   Oid             procOid         = InvalidOid;
    +   Oid             argList[1];
    +   Oid             returnOid;
    +
    +   char*   new_func_name = (char *)palloc0(strlen(protoname) + 16);
    +   sprintf(new_func_name, "%s_blocklocation", protoname);
    +   funcname = lappend(funcname, makeString(new_func_name));
    +   returnOid = VOIDOID;
    +   procOid = LookupFuncName(funcname, 0, argList, true);
    +
    +   if (!OidIsValid(procOid))
    +           ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION),
    +                                           errmsg("protocol function %s 
was not found.",
    +                                                           new_func_name),
    +                                           errhint("Create it with CREATE 
FUNCTION."),
    +                                           errOmitLocation(true)));
    +
    +   /* check return type matches */
    +   if (get_func_rettype(procOid) != returnOid)
    +           ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
    +                                           errmsg("protocol function %s 
has an incorrect return type",
    +                                                           new_func_name),
    +                                           errOmitLocation(true)));
    +
    +   /* check allowed volatility */
    +   if (func_volatile(procOid) != PROVOLATILE_STABLE)
    +           ereport(ERROR, (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
    +                                           errmsg("protocol function %s is 
not declared STABLE.",
    +                                           new_func_name),
    +                                           errOmitLocation(true)));
    +   pfree(new_func_name);
    +
    +   return procOid;
    +}
    +
    +static void ExternalGetHdfsFileDataLocation(
    +                           Relation relation,
    +                           split_to_segment_mapping_context *context,
    +                           int64 splitsize,
    +                           Relation_Data *rel_data,
    +                           int* allblocks) {
    +   ExtTableEntry *ext_entry = GetExtTableEntry(rel_data->relid);
    +   Assert(ext_entry->locations != NIL);
    +   int64 total_size = 0;
    +   int segno = 1;
    +
    +   /*
    +    * Step 1. get external HDFS location from URI.
    +    */
    +   char* first_uri_str = (char *) 
strVal(lfirst(list_head(ext_entry->locations)));
    +   /* We must have at least one location. */
    +   Assert(first_uri_str != NULL);
    +   Uri* uri = ParseExternalTableUri(first_uri_str);
    +   bool isHdfs = false;
    +   if (uri != NULL && is_hdfs_protocol(uri)) {
    +           isHdfs = true;
    +   }
    +   Assert(isHdfs);  /* Currently, we accept HDFS only. */
    +
    +    /*
    +     * Step 2. Get function to call for getting location information. This 
work
    +     * is done by validator function registered for this external protocol.
    +     */
    +   Oid procOid = InvalidOid;
    +   if (isHdfs) {
    +           procOid = LookupCustomProtocolBlockLocationFunc("hdfs");
    +   }
    +   else
    +   {
    +           Assert(false);
    +   }
    +
    +   /*
    +    * Step 3. Call validator to get location data.
    +    */
    +
    +   /* Prepare function call parameter by passing into location string. 
This is
    +    * only called at dispatcher side. */
    +   List *bls = NULL; /* Block locations */
    +   if (OidIsValid(procOid) && Gp_role == GP_ROLE_DISPATCH)
    +   {
    +           InvokeHDFSProtocolBlockLocation(procOid, ext_entry->locations, 
&bls);
    +   }
    +
    +   /*
    +    * Step 4. Build data location info for optimization after this call.
    +    */
    +
    +   /* Go through each files */
    +   ListCell *cbl = NULL;
    +   foreach(cbl, bls)
    +   {
    +           blocklocation_file *f = (blocklocation_file *)lfirst(cbl);
    +           BlockLocation *locations = f->locations;
    +           int block_num = f->block_num;
    +           int64 logic_len = 0;
    +           *allblocks += block_num;
    +           if ((locations != NULL) && (block_num > 0)) {
    +                   // calculate length for one specific file
    +                   for (int j = 0; j < block_num; ++j) {
    +                           logic_len += locations[j].length;
    +           //              locations[j].lowerBoundInc = NULL;
    +           //              locations[j].upperBoundExc = NULL;
    +                   }
    +                   total_size += logic_len;
    +
    +                   Block_Host_Index * host_index = 
update_data_dist_stat(context,
    +                                   locations, block_num);
    +
    +                   Relation_File *file = (Relation_File *) 
palloc(sizeof(Relation_File));
    +                   if (atoi(strrchr(f->file_uri, '/') + 1) > 0)
    +                     file->segno = atoi(strrchr(f->file_uri, '/') + 1);
    +                   else
    +                     file->segno = segno++;
    +                   file->block_num = block_num;
    +                   file->locations = locations;
    +                   file->hostIDs = host_index;
    +                   file->logic_len = logic_len;
    +
    +                   // do the split logic
    +                   int realSplitNum = 0;
    +                   int split_num = file->block_num;
    +                   int64 offset = 0;
    +                   File_Split *splits = (File_Split *) 
palloc(sizeof(File_Split) * split_num);
    +                   while (realSplitNum < split_num) {
    +                                   splits[realSplitNum].host = -1;
    +                                   splits[realSplitNum].is_local_read = 
true;
    +                                   splits[realSplitNum].offset = offset;
    +                                   splits[realSplitNum].length = 
file->locations[realSplitNum].length;
    +                                   splits[realSplitNum].logiceof = 
logic_len;
    +                                   splits[realSplitNum].ext_file_uri = 
pstrdup(f->file_uri);
    +                                   splits[realSplitNum].fs_lb_len = 0;
    +                                   splits[realSplitNum].fs_ub_len = 0;
    +                           //      
file->locations[realSplitNum].lowerBoundInc = NULL;
    +                                   splits[realSplitNum].lower_bound_inc = 
NULL;
    --- End diff --
    
    Remove


---

Reply via email to