Github user gvramana commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2006#discussion_r171852959 --- Diff: core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java --- @@ -38,85 +46,158 @@ /** * Merge all the carbonindex files of segment to a merged file - * @param segmentPath + * @param tablePath * @param indexFileNamesTobeAdded while merging it comsiders only these files. * If null then consider all * @param readFileFooterFromCarbonDataFile flag to read file footer information from carbondata * file. This will used in case of upgrade from version * which do not store the blocklet info to current version * @throws IOException */ - private void mergeCarbonIndexFilesOfSegment(String segmentPath, - List<String> indexFileNamesTobeAdded, boolean readFileFooterFromCarbonDataFile) - throws IOException { - CarbonFile[] indexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath); + private SegmentIndexFIleMergeStatus mergeCarbonIndexFilesOfSegment(String segmentId, + String tablePath, List<String> indexFileNamesTobeAdded, + boolean readFileFooterFromCarbonDataFile) throws IOException { + Segment segment = Segment.getSegment(segmentId, tablePath); + String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId); + CarbonFile[] indexFiles; + SegmentFileStore sfs = null; + if (segment != null && segment.getSegmentFileName() != null) { + sfs = new SegmentFileStore(tablePath, segment.getSegmentFileName()); + List<CarbonFile> indexCarbonFiles = sfs.getIndexCarbonFiles(); + indexFiles = indexCarbonFiles.toArray(new CarbonFile[indexCarbonFiles.size()]); + } else { + indexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath); + } if (isCarbonIndexFilePresent(indexFiles) || indexFileNamesTobeAdded != null) { - SegmentIndexFileStore fileStore = new SegmentIndexFileStore(); - if (readFileFooterFromCarbonDataFile) { - // this case will be used in case of upgrade where old store will not have the blocklet - // info in the index file and therefore blocklet info need to be read from the file footer - // in the carbondata file - fileStore.readAllIndexAndFillBolckletInfo(segmentPath); + if (sfs == null) { + return mergeNormalSegment(indexFileNamesTobeAdded, readFileFooterFromCarbonDataFile, + segmentPath, indexFiles); } else { - fileStore.readAllIIndexOfSegment(segmentPath); + return mergePartitionSegment(indexFileNamesTobeAdded, sfs, indexFiles); } - Map<String, byte[]> indexMap = fileStore.getCarbonIndexMap(); - MergedBlockIndexHeader indexHeader = new MergedBlockIndexHeader(); - MergedBlockIndex mergedBlockIndex = new MergedBlockIndex(); - List<String> fileNames = new ArrayList<>(indexMap.size()); - List<ByteBuffer> data = new ArrayList<>(indexMap.size()); - for (Map.Entry<String, byte[]> entry : indexMap.entrySet()) { - if (indexFileNamesTobeAdded == null || - indexFileNamesTobeAdded.contains(entry.getKey())) { - fileNames.add(entry.getKey()); - data.add(ByteBuffer.wrap(entry.getValue())); - } + } + return null; + } + + + private SegmentIndexFIleMergeStatus mergeNormalSegment(List<String> indexFileNamesTobeAdded, + boolean readFileFooterFromCarbonDataFile, String segmentPath, CarbonFile[] indexFiles) + throws IOException { + SegmentIndexFileStore fileStore = new SegmentIndexFileStore(); + if (readFileFooterFromCarbonDataFile) { + // this case will be used in case of upgrade where old store will not have the blocklet + // info in the index file and therefore blocklet info need to be read from the file footer + // in the carbondata file + fileStore.readAllIndexAndFillBolckletInfo(segmentPath); + } else { + fileStore.readAllIIndexOfSegment(segmentPath); + } + Map<String, byte[]> indexMap = fileStore.getCarbonIndexMap(); + writeMergeIndexFile(indexFileNamesTobeAdded, segmentPath, indexMap); + for (CarbonFile indexFile : indexFiles) { + indexFile.delete(); --- End diff -- deletion should be postponed as parallel read can happen
---