Github user QiangCai commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2644#discussion_r214889642
  
    --- Diff: 
hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 ---
    @@ -339,63 +338,62 @@ public void refreshSegmentCacheIfRequired(JobContext 
job, CarbonTable carbonTabl
         return filteredSegmentToAccess;
       }
     
    +  public List<InputSplit> getSplitsOfStreaming(JobContext job, 
List<Segment> streamSegments,
    +      CarbonTable carbonTable) throws IOException {
    +    return getSplitsOfStreaming(job, streamSegments, carbonTable, null);
    +  }
    +
       /**
        * use file list in .carbonindex file to get the split of streaming.
        */
    -  public List<InputSplit> getSplitsOfStreaming(JobContext job, 
AbsoluteTableIdentifier identifier,
    -      List<Segment> streamSegments) throws IOException {
    +  public List<InputSplit> getSplitsOfStreaming(JobContext job, 
List<Segment> streamSegments,
    +      CarbonTable carbonTable, FilterResolverIntf filterResolverIntf) 
throws IOException {
         List<InputSplit> splits = new ArrayList<InputSplit>();
         if (streamSegments != null && !streamSegments.isEmpty()) {
           numStreamSegments = streamSegments.size();
           long minSize = Math.max(getFormatMinSplitSize(), 
getMinSplitSize(job));
           long maxSize = getMaxSplitSize(job);
    -      for (Segment segment : streamSegments) {
    -        String segmentDir =
    -            CarbonTablePath.getSegmentPath(identifier.getTablePath(), 
segment.getSegmentNo());
    -        FileFactory.FileType fileType = 
FileFactory.getFileType(segmentDir);
    -        if (FileFactory.isFileExist(segmentDir, fileType)) {
    -          SegmentIndexFileStore segmentIndexFileStore = new 
SegmentIndexFileStore();
    -          segmentIndexFileStore.readAllIIndexOfSegment(segmentDir);
    -          Map<String, byte[]> carbonIndexMap = 
segmentIndexFileStore.getCarbonIndexMap();
    -          CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
    -          for (byte[] fileData : carbonIndexMap.values()) {
    -            indexReader.openThriftReader(fileData);
    -            try {
    -              // map block index
    -              while (indexReader.hasNext()) {
    -                BlockIndex blockIndex = indexReader.readBlockIndexInfo();
    -                String filePath = segmentDir + File.separator + 
blockIndex.getFile_name();
    -                Path path = new Path(filePath);
    -                long length = blockIndex.getFile_size();
    -                if (length != 0) {
    -                  BlockLocation[] blkLocations;
    -                  FileSystem fs = FileFactory.getFileSystem(path);
    -                  FileStatus file = fs.getFileStatus(path);
    -                  blkLocations = fs.getFileBlockLocations(path, 0, length);
    -                  long blockSize = file.getBlockSize();
    -                  long splitSize = computeSplitSize(blockSize, minSize, 
maxSize);
    -                  long bytesRemaining = length;
    -                  while (((double) bytesRemaining) / splitSize > 1.1) {
    -                    int blkIndex = getBlockIndex(blkLocations, length - 
bytesRemaining);
    -                    splits.add(makeSplit(segment.getSegmentNo(), path, 
length - bytesRemaining,
    -                        splitSize, blkLocations[blkIndex].getHosts(),
    -                        blkLocations[blkIndex].getCachedHosts(), 
FileFormat.ROW_V1));
    -                    bytesRemaining -= splitSize;
    -                  }
    -                  if (bytesRemaining != 0) {
    -                    int blkIndex = getBlockIndex(blkLocations, length - 
bytesRemaining);
    -                    splits.add(makeSplit(segment.getSegmentNo(), path, 
length - bytesRemaining,
    -                        bytesRemaining, blkLocations[blkIndex].getHosts(),
    -                        blkLocations[blkIndex].getCachedHosts(), 
FileFormat.ROW_V1));
    -                  }
    -                } else {
    -                  //Create empty hosts array for zero length files
    -                  splits.add(makeSplit(segment.getSegmentNo(), path, 0, 
length, new String[0],
    -                      FileFormat.ROW_V1));
    -                }
    -              }
    -            } finally {
    -              indexReader.closeThriftReader();
    +      if (filterResolverIntf == null) {
    +        if (carbonTable != null) {
    +          Expression filter = getFilterPredicates(job.getConfiguration());
    +          if (filter != null) {
    +            carbonTable.processFilterExpression(filter, null, null);
    +            filterResolverIntf = carbonTable.resolveFilter(filter);
    +          }
    +        }
    +      }
    +      StreamDataMap streamDataMap =
    +          DataMapStoreManager.getInstance().getStreamDataMap(carbonTable);
    +      streamDataMap.init(filterResolverIntf);
    +      List<StreamFile> streamFiles = streamDataMap.prune(streamSegments);
    +
    +      for (StreamFile streamFile : streamFiles) {
    +        if (FileFactory.isFileExist(streamFile.getFilePath())) {
    --- End diff --
    
    fixed


---

Reply via email to