Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2624#discussion_r210213202
  
    --- Diff: 
datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomIndexFileStore.java
 ---
    @@ -0,0 +1,259 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.datamap.bloom;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.DataInputStream;
    +import java.io.File;
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
    +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.fileoperations.FileWriteOperation;
    +import org.apache.carbondata.core.reader.ThriftReader;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.writer.ThriftWriter;
    +import org.apache.carbondata.format.MergedBloomIndex;
    +import org.apache.carbondata.format.MergedBloomIndexHeader;
    +
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.util.bloom.CarbonBloomFilter;
    +import org.apache.thrift.TBase;
    +
    +public class BloomIndexFileStore {
    +
    +  private static final LogService LOGGER =
    +          
LogServiceFactory.getLogService(BloomIndexFileStore.class.getName());
    +
    +  /*suffix of original generated file*/
    +  public static final String BLOOM_INDEX_SUFFIX = ".bloomindex";
    +  /*suffix of merged bloom index file*/
    +  public static final String MERGE_BLOOM_INDEX_SUFFIX = ".bloomindexmerge";
    +  /* directory to store merged bloom index files */
    +  public static final String MERGE_BLOOM_INDEX_SHARD_NAME = "mergeShard";
    +  /**
    +   * flag file for merging
    +   * if flag file exists, query won't use mergeShard
    +   * if flag file not exists and mergeShard generated, query will use 
mergeShard
    +   */
    +  public static final String MERGE_INPROGRESS_FILE = 
"mergeShard.inprogress";
    +
    +
    +  public static void mergeBloomIndexFile(String dmSegmentPathString, 
List<String> indexCols) {
    +    // get all shard paths of old store
    +    CarbonFile segmentPath = FileFactory.getCarbonFile(dmSegmentPathString,
    +            FileFactory.getFileType(dmSegmentPathString));
    +    CarbonFile[] shardPaths = segmentPath.listFiles(new CarbonFileFilter() 
{
    +      @Override
    +      public boolean accept(CarbonFile file) {
    +        return file.isDirectory() && 
!file.getName().equals(MERGE_BLOOM_INDEX_SHARD_NAME);
    +      }
    +    });
    +
    +    String mergeShardPath = dmSegmentPathString + File.separator + 
MERGE_BLOOM_INDEX_SHARD_NAME;
    +    String mergeInprofressFile = dmSegmentPathString + File.separator + 
MERGE_INPROGRESS_FILE;
    +    try {
    +      // delete mergeShard folder if exists
    +      if (FileFactory.isFileExist(mergeShardPath)) {
    +        FileFactory.deleteFile(mergeShardPath, 
FileFactory.getFileType(mergeShardPath));
    +      }
    +      // create flag file before creating mergeShard folder
    +      if (!FileFactory.isFileExist(mergeInprofressFile)) {
    +        FileFactory.createNewFile(
    +            mergeInprofressFile, 
FileFactory.getFileType(mergeInprofressFile));
    +      }
    +      // prepare mergeShard output folder
    +      if (!FileFactory.mkdirs(mergeShardPath, 
FileFactory.getFileType(mergeShardPath))) {
    +        throw new RuntimeException("Failed to create directory " + 
mergeShardPath);
    +      }
    +    } catch (IOException e) {
    +      LOGGER.error(e, "Error occurs while create directory " + 
mergeShardPath);
    +      throw new RuntimeException("Error occurs while create directory " + 
mergeShardPath);
    +    }
    +
    +    // for each index column, merge the bloomindex files from all shards 
into one
    +    for (String indexCol: indexCols) {
    +      MergedBloomIndexHeader indexHeader = new MergedBloomIndexHeader();
    +      MergedBloomIndex mergedBloomIndex = new MergedBloomIndex();
    +      List<String> shardNames = new ArrayList<>();
    +      List<ByteBuffer> data = new ArrayList<>();
    +      try {
    +        for (CarbonFile shardPath : shardPaths) {
    +          String bloomIndexFile = 
getBloomIndexFile(shardPath.getCanonicalPath(), indexCol);
    +          DataInputStream dataInputStream = FileFactory.getDataInputStream(
    +                  bloomIndexFile, FileFactory.getFileType(bloomIndexFile));
    +          byte[] bytes = new byte[(int) 
FileFactory.getCarbonFile(bloomIndexFile).getSize()];
    +          try {
    +            dataInputStream.readFully(bytes);
    +            shardNames.add(shardPath.getName());
    +            data.add(ByteBuffer.wrap(bytes));
    +          } finally {
    +            dataInputStream.close();
    +          }
    +        }
    +        indexHeader.setShard_names(shardNames);
    +        mergedBloomIndex.setFileData(data);
    +        // write segment level file
    +        String mergeIndexFileName = getMergeBloomIndexFile(mergeShardPath, 
indexCol);
    +        ThriftWriter thriftWriter = new ThriftWriter(mergeIndexFileName, 
false);
    +        thriftWriter.open(FileWriteOperation.OVERWRITE);
    +        thriftWriter.write(indexHeader);
    +        thriftWriter.write(mergedBloomIndex);
    +        thriftWriter.close();
    +      } catch (IOException e) {
    +        LOGGER.error(e, "Error occurs while merge bloom index file of 
column: " + indexCol);
    +        // delete merge shard of bloom index for this segment when failed
    +        
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(mergeShardPath));
    +        throw new RuntimeException(
    +                "Error occurs while merge bloom index file of column: " + 
indexCol);
    +      }
    +    }
    +    // delete flag file and mergeShard can be used
    +    try {
    +      FileFactory.deleteFile(mergeInprofressFile, 
FileFactory.getFileType(mergeInprofressFile));
    +    } catch (IOException e) {
    +      LOGGER.error(e, "Error occurs while deleting file " + 
mergeInprofressFile);
    +      throw new RuntimeException("Error occurs while deleting file " + 
mergeInprofressFile);
    +    }
    +    // remove old store
    +    for (CarbonFile shardpath: shardPaths) {
    +      FileFactory.deleteAllCarbonFilesOfDir(shardpath);
    +    }
    +  }
    +
    +  /**
    +   * load bloom filter from bloom index file
    +   * @param shardPath
    --- End diff --
    
    remove these useless lines if not provide more information


---

Reply via email to