Github user Indhumathi27 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2963#discussion_r238152299
  
    --- Diff: 
datamap/example/src/main/java/org/apache/carbondata/datamap/minmax/AbstractMinMaxDataMapWriter.java
 ---
    @@ -0,0 +1,248 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.datamap.minmax;
    +
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.math.BigDecimal;
    +import java.util.List;
    +
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.datamap.Segment;
    +import org.apache.carbondata.core.datamap.dev.DataMapWriter;
    +import org.apache.carbondata.core.datastore.impl.FileFactory;
    +import org.apache.carbondata.core.datastore.page.ColumnPage;
    +import 
org.apache.carbondata.core.datastore.page.encoding.bool.BooleanConvert;
    +import 
org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsCollector;
    +import 
org.apache.carbondata.core.datastore.page.statistics.KeyPageStatsCollector;
    +import 
org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
    +import org.apache.carbondata.core.metadata.datatype.DataType;
    +import org.apache.carbondata.core.metadata.datatype.DataTypes;
    +import org.apache.carbondata.core.metadata.encoder.Encoding;
    +import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.core.util.DataTypeUtil;
    +
    +import org.apache.log4j.Logger;
    +
    +/**
    + * We will record the min & max value for each index column in each 
blocklet.
    + * Since the size of index is quite small, we will combine the index for 
all index columns
    + * in one file.
    + */
    +public abstract class AbstractMinMaxDataMapWriter extends DataMapWriter {
    +  private static final Logger LOGGER = LogServiceFactory.getLogService(
    +      AbstractMinMaxDataMapWriter.class.getName());
    +
    +  private ColumnPageStatsCollector[] indexColumnMinMaxCollectors;
    +  protected int currentBlockletId;
    +  private String currentIndexFile;
    +  private DataOutputStream currentIndexFileOutStream;
    +
    +  public AbstractMinMaxDataMapWriter(String tablePath, String dataMapName,
    +      List<CarbonColumn> indexColumns, Segment segment, String shardName) 
throws IOException {
    +    super(tablePath, dataMapName, indexColumns, segment, shardName);
    +    initStatsCollector();
    +    initDataMapFile();
    +  }
    +
    +  private void initStatsCollector() {
    +    indexColumnMinMaxCollectors = new 
ColumnPageStatsCollector[indexColumns.size()];
    +    CarbonColumn indexCol;
    +    for (int i = 0; i < indexColumns.size(); i++) {
    +      indexCol = indexColumns.get(i);
    +      if (indexCol.isMeasure()
    +          || (indexCol.isDimension()
    +          && DataTypeUtil.isPrimitiveColumn(indexCol.getDataType())
    +          && !indexCol.hasEncoding(Encoding.DICTIONARY)
    +          && !indexCol.hasEncoding(Encoding.DIRECT_DICTIONARY))) {
    +        indexColumnMinMaxCollectors[i] = 
PrimitivePageStatsCollector.newInstance(
    +            indexColumns.get(i).getDataType());
    +      } else {
    +        indexColumnMinMaxCollectors[i] = 
KeyPageStatsCollector.newInstance(DataTypes.BYTE_ARRAY);
    +      }
    +    }
    +  }
    +
    +  private void initDataMapFile() throws IOException {
    +    if (!FileFactory.isFileExist(dataMapPath) &&
    +        !FileFactory.mkdirs(dataMapPath, 
FileFactory.getFileType(dataMapPath))) {
    +      throw new IOException("Failed to create directory " + dataMapPath);
    +    }
    +
    +    try {
    +      currentIndexFile = MinMaxIndexDataMap.getIndexFile(dataMapPath,
    +          MinMaxIndexHolder.MINMAX_INDEX_PREFFIX + indexColumns.size());
    +      FileFactory.createNewFile(currentIndexFile, 
FileFactory.getFileType(currentIndexFile));
    +      currentIndexFileOutStream = 
FileFactory.getDataOutputStream(currentIndexFile,
    +          FileFactory.getFileType(currentIndexFile));
    +    } catch (IOException e) {
    +      CarbonUtil.closeStreams(currentIndexFileOutStream);
    +      LOGGER.error("Failed to init datamap index file", e);
    +      throw e;
    +    }
    +  }
    +
    +  protected void resetBlockletLevelMinMax() {
    +    for (int i = 0; i < indexColumns.size(); i++) {
    +      indexColumnMinMaxCollectors[i].getPageStats().clear();
    +    }
    +  }
    +
    +  @Override
    +  public void onBlockStart(String blockId) {
    +  }
    +
    +  @Override
    +  public void onBlockEnd(String blockId) {
    +  }
    +
    +  @Override public void onBlockletStart(int blockletId) {
    +  }
    +
    +  @Override public void onBlockletEnd(int blockletId) {
    +    flushMinMaxIndexFile();
    +    currentBlockletId++;
    +  }
    +
    +  @Override
    +  public void onPageAdded(int blockletId, int pageId, int pageSize, 
ColumnPage[] pages) {
    +    // as an example, we don't use page-level min-max generated by native 
carbondata here, we get
    +    // the min-max by comparing each row
    +    for (int rowId = 0; rowId < pageSize; rowId++) {
    +      for (int colIdx = 0; colIdx < indexColumns.size(); colIdx++) {
    +        Object originValue = pages[colIdx].getData(rowId);
    +        updateBlockletMinMax(colIdx, originValue);
    +      }
    +    }
    +  }
    +
    +  protected void updateBlockletMinMax(int indexColIdx, Object value) {
    +    if (null == value) {
    +      indexColumnMinMaxCollectors[indexColIdx].updateNull(0);
    +      return;
    +    }
    +
    +    CarbonColumn indexCol = indexColumns.get(indexColIdx);
    +    DataType dataType = indexCol.getDataType();
    +    if (indexCol.isMeasure()
    +        || (indexCol.isDimension()
    +        && DataTypeUtil.isPrimitiveColumn(dataType)
    +        && !indexCol.hasEncoding(Encoding.DICTIONARY)
    +        && !indexCol.hasEncoding(Encoding.DIRECT_DICTIONARY))) {
    +      if (DataTypes.BOOLEAN == dataType) {
    +        indexColumnMinMaxCollectors[indexColIdx].update(
    +            BooleanConvert.boolean2Byte((boolean) value));
    +      } else if (DataTypes.SHORT == dataType) {
    +        indexColumnMinMaxCollectors[indexColIdx].update((short) value);
    +      } else if (DataTypes.INT == dataType) {
    +        indexColumnMinMaxCollectors[indexColIdx].update((int) value);
    +      } else if (DataTypes.LONG == dataType || DataTypes.TIMESTAMP == 
dataType) {
    +        indexColumnMinMaxCollectors[indexColIdx].update((long) value);
    +      } else if (DataTypes.DOUBLE == dataType) {
    +        indexColumnMinMaxCollectors[indexColIdx].update((double) value);
    +      } else if (DataTypes.isDecimal(dataType)) {
    --- End diff --
    
    Please handle for DataTypes.FLOAT datatype also


---

Reply via email to