Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2200#discussion_r183203455
  
    --- Diff: 
datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
 ---
    @@ -0,0 +1,243 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.datamap.bloom;
    +
    +import java.io.DataInputStream;
    +import java.io.EOFException;
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.io.UnsupportedEncodingException;
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.datamap.dev.DataMapModel;
    +import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.indexstore.Blocklet;
    +import org.apache.carbondata.core.indexstore.PartitionSpec;
    +import org.apache.carbondata.core.memory.MemoryException;
    +import org.apache.carbondata.core.metadata.datatype.DataType;
    +import org.apache.carbondata.core.metadata.datatype.DataTypes;
    +import org.apache.carbondata.core.scan.expression.ColumnExpression;
    +import org.apache.carbondata.core.scan.expression.Expression;
    +import org.apache.carbondata.core.scan.expression.LiteralExpression;
    +import 
org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
    +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +
    +import com.google.common.collect.ArrayListMultimap;
    +import com.google.common.collect.Multimap;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.PathFilter;
    +
    +public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
    +  private static final LogService LOGGER =
    +      
LogServiceFactory.getLogService(BloomCoarseGrainDataMap.class.getName());
    +  private String[] indexFilePath;
    +  private Set<String> indexedColumn;
    +  private List<BloomDMModel> bloomIndexList;
    +  private Multimap<String, List<BloomDMModel>> indexCol2BloomDMList;
    +
    +  @Override
    +  public void init(DataMapModel dataMapModel) throws MemoryException, 
IOException {
    +    Path indexPath = FileFactory.getPath(dataMapModel.getFilePath());
    +    FileSystem fs = FileFactory.getFileSystem(indexPath);
    +    if (!fs.exists(indexPath)) {
    +      throw new IOException(
    +          String.format("Path %s for Bloom index dataMap does not exist", 
indexPath));
    +    }
    +    if (!fs.isDirectory(indexPath)) {
    +      throw new IOException(
    +          String.format("Path %s for Bloom index dataMap must be a 
directory", indexPath));
    +    }
    +
    +    FileStatus[] indexFileStatus = fs.listStatus(indexPath, new 
PathFilter() {
    +      @Override public boolean accept(Path path) {
    +        return path.getName().endsWith(".bloomindex");
    +      }
    +    });
    +    indexFilePath = new String[indexFileStatus.length];
    +    indexedColumn = new HashSet<String>();
    +    bloomIndexList = new ArrayList<BloomDMModel>();
    +    indexCol2BloomDMList = ArrayListMultimap.create();
    +    for (int i = 0; i < indexFileStatus.length; i++) {
    +      indexFilePath[i] = indexFileStatus[i].getPath().toString();
    +      String indexCol = StringUtils.substringBetween(indexFilePath[i], 
".carbondata.",
    +          ".bloomindex");
    +      indexedColumn.add(indexCol);
    +      bloomIndexList.addAll(readBloomIndex(indexFilePath[i]));
    +      indexCol2BloomDMList.put(indexCol, readBloomIndex(indexFilePath[i]));
    +    }
    +    LOGGER.info("find bloom index datamap for column: "
    +        + StringUtils.join(indexedColumn, ", "));
    +  }
    +
    +  private List<BloomDMModel> readBloomIndex(String indexFile) throws 
IOException {
    +    LOGGER.info("read bloom index from file: " + indexFile);
    +    List<BloomDMModel> bloomDMModelList = new ArrayList<BloomDMModel>();
    +    DataInputStream dataInStream = null;
    +    ObjectInputStream objectInStream = null;
    +    try {
    +      dataInStream = FileFactory.getDataInputStream(indexFile, 
FileFactory.getFileType(indexFile));
    +      objectInStream = new ObjectInputStream(dataInStream);
    +      try {
    +        BloomDMModel model = null;
    +        while ((model = (BloomDMModel) objectInStream.readObject()) != 
null) {
    +          LOGGER.info("read bloom index: " + model);
    +          bloomDMModelList.add(model);
    +        }
    +      } catch (EOFException e) {
    +        LOGGER.info("read " + bloomDMModelList.size() + " bloom indices 
from " + indexFile);
    +      }
    +      return bloomDMModelList;
    +    } catch (ClassNotFoundException e) {
    +      LOGGER.error("Error occrus while reading bloom index");
    +      throw new RuntimeException("Error occrus while reading bloom index", 
e);
    +    } finally {
    +      CarbonUtil.closeStreams(objectInStream, dataInStream);
    +    }
    +  }
    +
    +  @Override
    +  public List<Blocklet> prune(FilterResolverIntf filterExp, 
SegmentProperties segmentProperties,
    +      List<PartitionSpec> partitions) throws IOException {
    +    List<Blocklet> hitBlocklets = new ArrayList<Blocklet>();
    +    if (filterExp == null) {
    +      return null;
    --- End diff --
    
    Oh, then we should document it clearly somewhere, otherwise it is confusing


---

Reply via email to