Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2374#discussion_r196625677
  
    --- Diff: 
hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java ---
    @@ -515,12 +574,73 @@ private CarbonInputSplit 
convertToCarbonInputSplit(ExtendedBlocklet blocklet) th
         return split;
       }
     
    +  private List<CarbonInputSplit> 
convertToInputSplit4ExternalFormat(JobContext jobContext,
    +      ExtendedBlocklet extendedBlocklet) throws IOException {
    +    List<CarbonInputSplit> splits = new ArrayList<CarbonInputSplit>();
    +    String factFilePath = extendedBlocklet.getFilePath();
    +    Path path = new Path(factFilePath);
    +    FileSystem fs = FileFactory.getFileSystem(path);
    +    FileStatus fileStatus = fs.getFileStatus(path);
    +    long length = fileStatus.getLen();
    +    if (length != 0) {
    +      BlockLocation[] blkLocations = fs.getFileBlockLocations(path, 0, 
length);
    +      long blkSize = fileStatus.getBlockSize();
    +      long minSplitSize = Math.max(getFormatMinSplitSize(), 
getMinSplitSize(jobContext));
    +      long maxSplitSize = getMaxSplitSize(jobContext);
    +      long splitSize = computeSplitSize(blkSize, minSplitSize, 
maxSplitSize);
    +      long bytesRemaining = fileStatus.getLen();
    +      while (((double) bytesRemaining) / splitSize > 1.1) {
    +        int blkIndex = getBlockIndex(blkLocations, length - 
bytesRemaining);
    +        splits.add(new CarbonInputSplit(extendedBlocklet.getSegmentId(), 
path,
    +            length - bytesRemaining,
    +            splitSize, blkLocations[blkIndex].getHosts(),
    +            blkLocations[blkIndex].getCachedHosts(), FileFormat.EXTERNAL));
    +        bytesRemaining -= splitSize;
    +      }
    +      if (bytesRemaining != 0) {
    +        int blkIndex = getBlockIndex(blkLocations, length - 
bytesRemaining);
    +        splits.add(new CarbonInputSplit(extendedBlocklet.getSegmentId(), 
path,
    +            length - bytesRemaining,
    +            bytesRemaining, blkLocations[blkIndex].getHosts(),
    +            blkLocations[blkIndex].getCachedHosts(), FileFormat.EXTERNAL));
    +      }
    +    } else {
    +      splits.add(new CarbonInputSplit(extendedBlocklet.getSegmentId(), 
path, 0, length,
    +          new String[0], FileFormat.EXTERNAL));
    +    }
    +    return splits;
    +  }
    +
       @Override public RecordReader<Void, T> createRecordReader(InputSplit 
inputSplit,
           TaskAttemptContext taskAttemptContext) throws IOException, 
InterruptedException {
         Configuration configuration = taskAttemptContext.getConfiguration();
         QueryModel queryModel = createQueryModel(inputSplit, 
taskAttemptContext);
         CarbonReadSupport<T> readSupport = getReadSupportClass(configuration);
    -    return new CarbonRecordReader<T>(queryModel, readSupport);
    +    if (inputSplit instanceof CarbonMultiBlockSplit
    +        && ((CarbonMultiBlockSplit) inputSplit).getFileFormat() == 
FileFormat.EXTERNAL) {
    +      return createRecordReaderForExternalFormat(queryModel, readSupport,
    +          
configuration.get(CarbonCommonConstants.CARBON_EXTERNAL_FORMAT_CONF_KEY));
    +    } else if (inputSplit instanceof CarbonInputSplit
    +        && ((CarbonInputSplit) inputSplit).getFileFormat() == 
FileFormat.EXTERNAL) {
    +      return createRecordReaderForExternalFormat(queryModel, readSupport,
    +          
configuration.get(CarbonCommonConstants.CARBON_EXTERNAL_FORMAT_CONF_KEY));
    +    } else {
    +      return new CarbonRecordReader<T>(queryModel, readSupport);
    +    }
    +  }
    +
    +  @Since("1.4.1")
    --- End diff --
    
    OK


---

Reply via email to