Github user xuchuanyin commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2254#discussion_r185238634
  
    --- Diff: 
datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
 ---
    @@ -21,142 +21,124 @@
     import java.io.IOException;
     import java.io.ObjectOutputStream;
     import java.util.ArrayList;
    -import java.util.HashMap;
     import java.util.List;
    -import java.util.Map;
     
     import org.apache.carbondata.common.annotations.InterfaceAudience;
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
     import org.apache.carbondata.core.constants.CarbonCommonConstants;
    -import org.apache.carbondata.core.datamap.DataMapMeta;
     import org.apache.carbondata.core.datamap.Segment;
     import org.apache.carbondata.core.datamap.dev.DataMapWriter;
     import org.apache.carbondata.core.datastore.impl.FileFactory;
    -import org.apache.carbondata.core.datastore.page.ColumnPage;
    -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
    +import org.apache.carbondata.core.datastore.row.CarbonRow;
     import org.apache.carbondata.core.metadata.datatype.DataType;
     import org.apache.carbondata.core.metadata.datatype.DataTypes;
    +import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
     import org.apache.carbondata.core.util.CarbonUtil;
     
     import com.google.common.hash.BloomFilter;
     import com.google.common.hash.Funnels;
     
     /**
    - * BloomDataMap is constructed in blocklet level. For each indexed column, 
a bloom filter is
    - * constructed to indicate whether a value belongs to this blocklet. Bloom 
filter of blocklet that
    - * belongs to same block will be written to one index file suffixed with 
.bloomindex. So the number
    + * BloomDataMap is constructed in CG level (blocklet level).
    + * For each indexed column, a bloom filter is constructed to indicate 
whether a value
    + * belongs to this blocklet. Bloom filter of blocklet that belongs to same 
block will
    + * be written to one index file suffixed with .bloomindex. So the number
      * of bloom index file will be equal to that of the blocks.
      */
     @InterfaceAudience.Internal
     public class BloomDataMapWriter extends DataMapWriter {
    -  private String dataMapName;
    -  private List<String> indexedColumns;
    +  private static final LogService LOG = LogServiceFactory.getLogService(
    +      BloomDataMapWriter.class.getCanonicalName());
       private int bloomFilterSize;
    -  // map column name to ordinal in pages
    -  private Map<String, Integer> col2Ordianl;
    -  private Map<String, DataType> col2DataType;
    -  private String indexShardName;
    -  private int currentBlockletId;
    +  protected int currentBlockletId;
       private List<String> currentDMFiles;
       private List<DataOutputStream> currentDataOutStreams;
       private List<ObjectOutputStream> currentObjectOutStreams;
       private List<BloomFilter<byte[]>> indexBloomFilters;
     
    -  @InterfaceAudience.Internal
    -  public BloomDataMapWriter(AbsoluteTableIdentifier identifier, 
DataMapMeta dataMapMeta,
    -      int bloomFilterSize, Segment segment, String writeDirectoryPath) {
    -    super(identifier, segment, writeDirectoryPath);
    -    dataMapName = dataMapMeta.getDataMapName();
    -    indexedColumns = dataMapMeta.getIndexedColumns();
    +  BloomDataMapWriter(String tablePath, String dataMapName, 
List<CarbonColumn> indexColumns,
    +      Segment segment, String shardName, int bloomFilterSize) throws 
IOException {
    +    super(tablePath, dataMapName, indexColumns, segment, shardName);
         this.bloomFilterSize = bloomFilterSize;
    -    col2Ordianl = new HashMap<String, Integer>(indexedColumns.size());
    -    col2DataType = new HashMap<String, DataType>(indexedColumns.size());
     
    -    currentDMFiles = new ArrayList<String>(indexedColumns.size());
    -    currentDataOutStreams = new 
ArrayList<DataOutputStream>(indexedColumns.size());
    -    currentObjectOutStreams = new 
ArrayList<ObjectOutputStream>(indexedColumns.size());
    -
    -    indexBloomFilters = new 
ArrayList<BloomFilter<byte[]>>(indexedColumns.size());
    +    currentDMFiles = new ArrayList<String>(indexColumns.size());
    +    currentDataOutStreams = new 
ArrayList<DataOutputStream>(indexColumns.size());
    +    currentObjectOutStreams = new 
ArrayList<ObjectOutputStream>(indexColumns.size());
    +    indexBloomFilters = new 
ArrayList<BloomFilter<byte[]>>(indexColumns.size());
    +    initDataMapFile();
    +    resetBloomFilters();
       }
     
       @Override
    -  public void onBlockStart(String blockId, String indexShardName) throws 
IOException {
    -    if (this.indexShardName == null) {
    -      this.indexShardName = indexShardName;
    -      initDataMapFile();
    -    }
    +  public void onBlockStart(String blockId) throws IOException {
       }
     
       @Override
       public void onBlockEnd(String blockId) throws IOException {
    -
       }
     
       @Override
       public void onBlockletStart(int blockletId) {
    -    this.currentBlockletId = blockletId;
    +  }
    +
    +  protected void resetBloomFilters() {
         indexBloomFilters.clear();
    -    for (int i = 0; i < indexedColumns.size(); i++) {
    +    List<CarbonColumn> indexColumns = getIndexColumns();
    +    for (int i = 0; i < indexColumns.size(); i++) {
           indexBloomFilters.add(BloomFilter.create(Funnels.byteArrayFunnel(),
               bloomFilterSize, 0.00001d));
         }
       }
     
       @Override
       public void onBlockletEnd(int blockletId) {
    -    try {
    -      writeBloomDataMapFile();
    -    } catch (Exception e) {
    -      for (ObjectOutputStream objectOutputStream : 
currentObjectOutStreams) {
    -        CarbonUtil.closeStreams(objectOutputStream);
    -      }
    -      for (DataOutputStream dataOutputStream : currentDataOutStreams) {
    -        CarbonUtil.closeStreams(dataOutputStream);
    -      }
    -      throw new RuntimeException(e);
    -    }
    +    writeBloomDataMapFile();
    +    currentBlockletId++;
       }
     
    -  // notice that the input pages only contains the indexed columns
       @Override
    -  public void onPageAdded(int blockletId, int pageId, ColumnPage[] pages)
    -      throws IOException {
    -    col2Ordianl.clear();
    -    col2DataType.clear();
    -    for (int colId = 0; colId < pages.length; colId++) {
    -      String columnName = 
pages[colId].getColumnSpec().getFieldName().toLowerCase();
    -      col2Ordianl.put(columnName, colId);
    -      DataType columnType = 
pages[colId].getColumnSpec().getSchemaDataType();
    -      col2DataType.put(columnName, columnType);
    -    }
    +  public void addRow(int blockletId, int pageId, int rowId, CarbonRow row) 
{
    +    addRow(row.getData());
    +  }
     
    -    // for each row
    -    for (int rowId = 0; rowId < pages[0].getPageSize(); rowId++) {
    -      // for each indexed column
    -      for (int indexColId = 0; indexColId < indexedColumns.size(); 
indexColId++) {
    -        String indexedCol = indexedColumns.get(indexColId);
    -        byte[] indexValue;
    -        if (DataTypes.STRING == col2DataType.get(indexedCol)
    -            || DataTypes.BYTE_ARRAY == col2DataType.get(indexedCol)) {
    -          byte[] originValue = (byte[]) 
pages[col2Ordianl.get(indexedCol)].getData(rowId);
    -          indexValue = new byte[originValue.length - 2];
    -          System.arraycopy(originValue, 2, indexValue, 0, 
originValue.length - 2);
    -        } else {
    -          Object originValue = 
pages[col2Ordianl.get(indexedCol)].getData(rowId);
    -          indexValue = 
CarbonUtil.getValueAsBytes(col2DataType.get(indexedCol), originValue);
    -        }
    -
    -        indexBloomFilters.get(indexColId).put(indexValue);
    +  protected void addRow(Object[] rowData) {
    +    // for each indexed column, add the data to bloom filter
    +    List<CarbonColumn> indexColumns = getIndexColumns();
    +    for (int i = 0; i < indexColumns.size(); i++) {
    +      DataType dataType = indexColumns.get(i).getDataType();
    +      byte[] indexValue;
    +      if (DataTypes.STRING == dataType) {
    --- End diff --
    
    The procedure for string and byte_array are the same, there is no need to 
distinguish them.


---

Reply via email to