Github user Indhumathi27 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2963#discussion_r238152299 --- Diff: datamap/example/src/main/java/org/apache/carbondata/datamap/minmax/AbstractMinMaxDataMapWriter.java --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.minmax; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.math.BigDecimal; +import java.util.List; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datamap.dev.DataMapWriter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.datastore.page.encoding.bool.BooleanConvert; +import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsCollector; +import org.apache.carbondata.core.datastore.page.statistics.KeyPageStatsCollector; +import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; + +import org.apache.log4j.Logger; + +/** + * We will record the min & max value for each index column in each blocklet. + * Since the size of index is quite small, we will combine the index for all index columns + * in one file. + */ +public abstract class AbstractMinMaxDataMapWriter extends DataMapWriter { + private static final Logger LOGGER = LogServiceFactory.getLogService( + AbstractMinMaxDataMapWriter.class.getName()); + + private ColumnPageStatsCollector[] indexColumnMinMaxCollectors; + protected int currentBlockletId; + private String currentIndexFile; + private DataOutputStream currentIndexFileOutStream; + + public AbstractMinMaxDataMapWriter(String tablePath, String dataMapName, + List<CarbonColumn> indexColumns, Segment segment, String shardName) throws IOException { + super(tablePath, dataMapName, indexColumns, segment, shardName); + initStatsCollector(); + initDataMapFile(); + } + + private void initStatsCollector() { + indexColumnMinMaxCollectors = new ColumnPageStatsCollector[indexColumns.size()]; + CarbonColumn indexCol; + for (int i = 0; i < indexColumns.size(); i++) { + indexCol = indexColumns.get(i); + if (indexCol.isMeasure() + || (indexCol.isDimension() + && DataTypeUtil.isPrimitiveColumn(indexCol.getDataType()) + && !indexCol.hasEncoding(Encoding.DICTIONARY) + && !indexCol.hasEncoding(Encoding.DIRECT_DICTIONARY))) { + indexColumnMinMaxCollectors[i] = PrimitivePageStatsCollector.newInstance( + indexColumns.get(i).getDataType()); + } else { + indexColumnMinMaxCollectors[i] = KeyPageStatsCollector.newInstance(DataTypes.BYTE_ARRAY); + } + } + } + + private void initDataMapFile() throws IOException { + if (!FileFactory.isFileExist(dataMapPath) && + !FileFactory.mkdirs(dataMapPath, FileFactory.getFileType(dataMapPath))) { + throw new IOException("Failed to create directory " + dataMapPath); + } + + try { + currentIndexFile = MinMaxIndexDataMap.getIndexFile(dataMapPath, + MinMaxIndexHolder.MINMAX_INDEX_PREFFIX + indexColumns.size()); + FileFactory.createNewFile(currentIndexFile, FileFactory.getFileType(currentIndexFile)); + currentIndexFileOutStream = FileFactory.getDataOutputStream(currentIndexFile, + FileFactory.getFileType(currentIndexFile)); + } catch (IOException e) { + CarbonUtil.closeStreams(currentIndexFileOutStream); + LOGGER.error("Failed to init datamap index file", e); + throw e; + } + } + + protected void resetBlockletLevelMinMax() { + for (int i = 0; i < indexColumns.size(); i++) { + indexColumnMinMaxCollectors[i].getPageStats().clear(); + } + } + + @Override + public void onBlockStart(String blockId) { + } + + @Override + public void onBlockEnd(String blockId) { + } + + @Override public void onBlockletStart(int blockletId) { + } + + @Override public void onBlockletEnd(int blockletId) { + flushMinMaxIndexFile(); + currentBlockletId++; + } + + @Override + public void onPageAdded(int blockletId, int pageId, int pageSize, ColumnPage[] pages) { + // as an example, we don't use page-level min-max generated by native carbondata here, we get + // the min-max by comparing each row + for (int rowId = 0; rowId < pageSize; rowId++) { + for (int colIdx = 0; colIdx < indexColumns.size(); colIdx++) { + Object originValue = pages[colIdx].getData(rowId); + updateBlockletMinMax(colIdx, originValue); + } + } + } + + protected void updateBlockletMinMax(int indexColIdx, Object value) { + if (null == value) { + indexColumnMinMaxCollectors[indexColIdx].updateNull(0); + return; + } + + CarbonColumn indexCol = indexColumns.get(indexColIdx); + DataType dataType = indexCol.getDataType(); + if (indexCol.isMeasure() + || (indexCol.isDimension() + && DataTypeUtil.isPrimitiveColumn(dataType) + && !indexCol.hasEncoding(Encoding.DICTIONARY) + && !indexCol.hasEncoding(Encoding.DIRECT_DICTIONARY))) { + if (DataTypes.BOOLEAN == dataType) { + indexColumnMinMaxCollectors[indexColIdx].update( + BooleanConvert.boolean2Byte((boolean) value)); + } else if (DataTypes.SHORT == dataType) { + indexColumnMinMaxCollectors[indexColIdx].update((short) value); + } else if (DataTypes.INT == dataType) { + indexColumnMinMaxCollectors[indexColIdx].update((int) value); + } else if (DataTypes.LONG == dataType || DataTypes.TIMESTAMP == dataType) { + indexColumnMinMaxCollectors[indexColIdx].update((long) value); + } else if (DataTypes.DOUBLE == dataType) { + indexColumnMinMaxCollectors[indexColIdx].update((double) value); + } else if (DataTypes.isDecimal(dataType)) { --- End diff -- Please handle for DataTypes.FLOAT datatype also
---