Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2624#discussion_r210211799
  
    --- Diff: 
datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
 ---
    @@ -222,102 +220,95 @@ public DataMapBuilder createBuilder(Segment segment, 
String shardName,
     
       @Override
       public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws 
IOException {
    -    List<CoarseGrainDataMap> dataMaps = new 
ArrayList<CoarseGrainDataMap>(1);
    +    List<CoarseGrainDataMap> dataMaps = new ArrayList<>();
         try {
           Set<String> shardPaths = segmentMap.get(segment.getSegmentNo());
           if (shardPaths == null) {
    -        String dataMapStorePath = DataMapWriter.getDefaultDataMapPath(
    -            getCarbonTable().getTablePath(), segment.getSegmentNo(), 
dataMapName);
    -        CarbonFile[] carbonFiles = 
FileFactory.getCarbonFile(dataMapStorePath).listFiles();
    -        shardPaths = new HashSet<>();
    -        for (CarbonFile carbonFile : carbonFiles) {
    -          shardPaths.add(carbonFile.getAbsolutePath());
    -        }
    +        shardPaths = getAllShardPaths(getCarbonTable().getTablePath(), 
segment.getSegmentNo());
             segmentMap.put(segment.getSegmentNo(), shardPaths);
           }
    +      Set<String> filteredShards = segment.getFilteredIndexShardNames();
           for (String shard : shardPaths) {
    -        BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap();
    -        bloomDM.init(new BloomDataMapModel(shard, cache));
    -        bloomDM.initIndexColumnConverters(getCarbonTable(), 
dataMapMeta.getIndexedColumns());
    -        dataMaps.add(bloomDM);
    +        if 
(shard.endsWith(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME) ||
    +            filteredShards.contains(new File(shard).getName())) {
    +          // Filter out the tasks which are filtered through Main datamap.
    +          // for merge shard, shard pruning delay to be done before 
pruning blocklet
    +          BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap();
    +          bloomDM.init(new BloomDataMapModel(shard, cache));
    +          bloomDM.initIndexColumnConverters(getCarbonTable(), 
dataMapMeta.getIndexedColumns());
    +          bloomDM.setFilteredShard(filteredShards);
    +          dataMaps.add(bloomDM);
    +        }
           }
         } catch (Exception e) {
           throw new IOException("Error occurs while init Bloom DataMap", e);
         }
         return dataMaps;
       }
     
    -  @Override
    -  public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable 
distributable)
    -      throws IOException {
    -    List<CoarseGrainDataMap> coarseGrainDataMaps = new ArrayList<>();
    -    BloomCoarseGrainDataMap bloomCoarseGrainDataMap = new 
BloomCoarseGrainDataMap();
    -    String indexPath = ((BloomDataMapDistributable) 
distributable).getIndexPath();
    -    bloomCoarseGrainDataMap.init(new BloomDataMapModel(indexPath, cache));
    -    bloomCoarseGrainDataMap.initIndexColumnConverters(getCarbonTable(),
    -        dataMapMeta.getIndexedColumns());
    -    coarseGrainDataMaps.add(bloomCoarseGrainDataMap);
    -    return coarseGrainDataMaps;
    -  }
    -
       /**
    -   * returns all the directories of lucene index files for query
    -   * Note: copied from luceneDataMapFactory, will extract to a common 
interface
    +   * returns all shard directories of bloom index files for query
    +   * if bloom index files are merged we should get only one shard path
        */
    -  private CarbonFile[] getAllIndexDirs(String tablePath, String segmentId) 
{
    -    List<CarbonFile> indexDirs = new ArrayList<>();
    -    List<TableDataMap> dataMaps;
    -    try {
    -      // there can be multiple bloom datamaps present on a table, so get 
all datamaps and form
    -      // the path till the index file directories in all datamaps folders 
present in each segment
    -      dataMaps = 
DataMapStoreManager.getInstance().getAllDataMap(getCarbonTable());
    -    } catch (IOException ex) {
    -      LOGGER.error(ex, String.format("failed to get datamaps for tablePath 
%s, segmentId %s",
    -          tablePath, segmentId));
    -      throw new RuntimeException(ex);
    -    }
    -    if (dataMaps.size() > 0) {
    -      for (TableDataMap dataMap : dataMaps) {
    -        if 
(dataMap.getDataMapSchema().getDataMapName().equals(this.dataMapName)) {
    -          List<CarbonFile> indexFiles;
    -          String dmPath = CarbonTablePath.getDataMapStorePath(tablePath, 
segmentId,
    -              dataMap.getDataMapSchema().getDataMapName());
    -          FileFactory.FileType fileType = FileFactory.getFileType(dmPath);
    -          final CarbonFile dirPath = FileFactory.getCarbonFile(dmPath, 
fileType);
    -          indexFiles = Arrays.asList(dirPath.listFiles(new 
CarbonFileFilter() {
    -            @Override
    -            public boolean accept(CarbonFile file) {
    -              return file.isDirectory();
    -            }
    -          }));
    -          indexDirs.addAll(indexFiles);
    +  private Set<String> getAllShardPaths(String tablePath, String segmentId) 
{
    +    String dataMapStorePath = CarbonTablePath.getDataMapStorePath(
    +            tablePath, segmentId, dataMapName);
    +    CarbonFile[] carbonFiles = 
FileFactory.getCarbonFile(dataMapStorePath).listFiles();
    +    Set<String> shardPaths = new HashSet<>();
    +    boolean mergeShardInprogress = false;
    +    CarbonFile mergeShardFile = null;
    +    for (CarbonFile carbonFile : carbonFiles) {
    +      if 
(carbonFile.getName().equals(BloomIndexFileStore.MERGE_BLOOM_INDEX_SHARD_NAME)) 
{
    +        mergeShardFile = carbonFile;
    +      } else if 
(carbonFile.getName().equals(BloomIndexFileStore.MERGE_INPROGRESS_FILE)) {
    +        mergeShardInprogress = true;
    +      } else {
    +        if (carbonFile.isDirectory()) {
    +          shardPaths.add(carbonFile.getAbsolutePath());
             }
           }
         }
    -    return indexDirs.toArray(new CarbonFile[0]);
    +    if (mergeShardFile != null && !mergeShardInprogress) {
    +      //should only get one shard path if mergeShard is generated 
successfully
    --- End diff --
    
    need a space after `//` before comment


---

Reply via email to