Github user zellerh commented on a diff in the pull request:

    https://github.com/apache/incubator-trafodion/pull/772#discussion_r85250548
  
    --- Diff: core/sql/executor/ExHdfsScan.cpp ---
    @@ -1700,6 +1711,138 @@ char * 
ExHdfsScanTcb::extractAndTransformAsciiSourceToSqlRow(int &err,
       return NULL;
     }
     
    +void ExHdfsScanTcb::computeRangesAtRuntime()
    +{
    +  int numFiles = 0;
    +  Int64 totalSize = 0;
    +  Int64 myShare = 0;
    +  Int64 runningSum = 0;
    +  Int64 myStartPositionInBytes = 0;
    +  Int64 firstFileStartingOffset = 0;
    +  Int64 lastFileBytesToRead = -1;
    +  Int32 numParallelInstances = MAXOF(getGlobals()->getNumOfInstances(),1);
    +  hdfsFS fs = ((GetCliGlobals()->currContext())->getHdfsServerConnection(
    +                    hdfsScanTdb().hostName_,
    +                    hdfsScanTdb().port_));
    +  hdfsFileInfo *fileInfos = hdfsListDirectory(fs,
    +                                              hdfsScanTdb().hdfsRootDir_,
    +                                              &numFiles);
    +
    +  if (runTimeRanges_)
    +    deallocateRuntimeRanges();
    +
    +  // in a first round, count the total number of bytes
    +  for (int f=0; f<numFiles; f++)
    +    {
    +      ex_assert(fileInfos[f].mKind == kObjectKindFile,
    +                "subdirectories not supported with runtime HDFS ranges");
    +      totalSize += (Int64) fileInfos[f].mSize;
    +    }
    +
    +  // compute my share, in bytes
    +  // (the last of the ESPs may read a bit more)
    +  myShare = totalSize / numParallelInstances;
    +  myStartPositionInBytes = myInstNum_ * myShare;
    +  beginRangeNum_ = -1;
    +  numRanges_ = 0;
    +
    +  if (totalSize > 0)
    +    {
    +      // second round, find out the range of files I need to read
    +      for (int g=0; g<numFiles; g++)
    +        {
    +          Int64 prevSum = runningSum;
    +
    +          runningSum += (Int64) fileInfos[g].mSize;
    +
    +          if (runningSum >= myStartPositionInBytes)
    +            {
    +              if (beginRangeNum_ < 0)
    +                {
    +                  // I have reached the first file that I need to read
    +                  beginRangeNum_ = g;
    +                  firstFileStartingOffset =
    +                    myStartPositionInBytes - prevSum;
    +                }
    +
    +              numRanges_++;
    +
    +              if (runningSum > (myStartPositionInBytes + myShare) &&
    +                  myInstNum_ < numParallelInstances-1)
    +                // the next file is beyond the range that I need to read
    +                lastFileBytesToRead =
    +                  myStartPositionInBytes + myShare - prevSum;
    +                break;
    +            }
    +        }
    +
    +      // now that we now how many ranges we need, allocate them
    +      numRunTimeRanges_ = numRanges_;
    +      runTimeRanges_ = new(getHeap()) HdfsFileInfo[numRunTimeRanges_];
    +    }
    +  else
    +    beginRangeNum_ = 0;
    +
    +  // third round, populate the ranges that this ESP needs to read
    +  for (int h=beginRangeNum_; h<beginRangeNum_+numRanges_; h++)
    --- End diff --
    
    Yes, this cold be improved. I thought of a scheme where we try to assign 
each ESP a file that is local to its node. The solution we have here is very 
similar to what the optimizer did in an earlier version of the code. And, yes, 
there is logic to read past the end of the assigned range to finish a record 
and to skip partial records at the beginning of the range.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to