Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2644#discussion_r214889642
--- Diff:
hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
---
@@ -339,63 +338,62 @@ public void refreshSegmentCacheIfRequired(JobContext
job, CarbonTable carbonTabl
return filteredSegmentToAccess;
}
+ public List<InputSplit> getSplitsOfStreaming(JobContext job,
List<Segment> streamSegments,
+ CarbonTable carbonTable) throws IOException {
+ return getSplitsOfStreaming(job, streamSegments, carbonTable, null);
+ }
+
/**
* use file list in .carbonindex file to get the split of streaming.
*/
- public List<InputSplit> getSplitsOfStreaming(JobContext job,
AbsoluteTableIdentifier identifier,
- List<Segment> streamSegments) throws IOException {
+ public List<InputSplit> getSplitsOfStreaming(JobContext job,
List<Segment> streamSegments,
+ CarbonTable carbonTable, FilterResolverIntf filterResolverIntf)
throws IOException {
List<InputSplit> splits = new ArrayList<InputSplit>();
if (streamSegments != null && !streamSegments.isEmpty()) {
numStreamSegments = streamSegments.size();
long minSize = Math.max(getFormatMinSplitSize(),
getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
- for (Segment segment : streamSegments) {
- String segmentDir =
- CarbonTablePath.getSegmentPath(identifier.getTablePath(),
segment.getSegmentNo());
- FileFactory.FileType fileType =
FileFactory.getFileType(segmentDir);
- if (FileFactory.isFileExist(segmentDir, fileType)) {
- SegmentIndexFileStore segmentIndexFileStore = new
SegmentIndexFileStore();
- segmentIndexFileStore.readAllIIndexOfSegment(segmentDir);
- Map<String, byte[]> carbonIndexMap =
segmentIndexFileStore.getCarbonIndexMap();
- CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
- for (byte[] fileData : carbonIndexMap.values()) {
- indexReader.openThriftReader(fileData);
- try {
- // map block index
- while (indexReader.hasNext()) {
- BlockIndex blockIndex = indexReader.readBlockIndexInfo();
- String filePath = segmentDir + File.separator +
blockIndex.getFile_name();
- Path path = new Path(filePath);
- long length = blockIndex.getFile_size();
- if (length != 0) {
- BlockLocation[] blkLocations;
- FileSystem fs = FileFactory.getFileSystem(path);
- FileStatus file = fs.getFileStatus(path);
- blkLocations = fs.getFileBlockLocations(path, 0, length);
- long blockSize = file.getBlockSize();
- long splitSize = computeSplitSize(blockSize, minSize,
maxSize);
- long bytesRemaining = length;
- while (((double) bytesRemaining) / splitSize > 1.1) {
- int blkIndex = getBlockIndex(blkLocations, length -
bytesRemaining);
- splits.add(makeSplit(segment.getSegmentNo(), path,
length - bytesRemaining,
- splitSize, blkLocations[blkIndex].getHosts(),
- blkLocations[blkIndex].getCachedHosts(),
FileFormat.ROW_V1));
- bytesRemaining -= splitSize;
- }
- if (bytesRemaining != 0) {
- int blkIndex = getBlockIndex(blkLocations, length -
bytesRemaining);
- splits.add(makeSplit(segment.getSegmentNo(), path,
length - bytesRemaining,
- bytesRemaining, blkLocations[blkIndex].getHosts(),
- blkLocations[blkIndex].getCachedHosts(),
FileFormat.ROW_V1));
- }
- } else {
- //Create empty hosts array for zero length files
- splits.add(makeSplit(segment.getSegmentNo(), path, 0,
length, new String[0],
- FileFormat.ROW_V1));
- }
- }
- } finally {
- indexReader.closeThriftReader();
+ if (filterResolverIntf == null) {
+ if (carbonTable != null) {
+ Expression filter = getFilterPredicates(job.getConfiguration());
+ if (filter != null) {
+ carbonTable.processFilterExpression(filter, null, null);
+ filterResolverIntf = carbonTable.resolveFilter(filter);
+ }
+ }
+ }
+ StreamDataMap streamDataMap =
+ DataMapStoreManager.getInstance().getStreamDataMap(carbonTable);
+ streamDataMap.init(filterResolverIntf);
+ List<StreamFile> streamFiles = streamDataMap.prune(streamSegments);
+
+ for (StreamFile streamFile : streamFiles) {
+ if (FileFactory.isFileExist(streamFile.getFilePath())) {
--- End diff --
fixed
---