http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java new file mode 100644 index 0000000..db9c9be --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.util; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; + +import org.apache.carbondata.core.carbon.datastore.block.BlockInfo; +import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.carbon.metadata.blocklet.SegmentInfo; +import org.apache.carbondata.core.carbon.metadata.blocklet.compressor.ChunkCompressorMeta; +import org.apache.carbondata.core.carbon.metadata.blocklet.compressor.CompressionCodec; +import org.apache.carbondata.core.carbon.metadata.blocklet.datachunk.DataChunk; +import org.apache.carbondata.core.carbon.metadata.blocklet.datachunk.PresenceMeta; +import org.apache.carbondata.core.carbon.metadata.blocklet.index.BlockletBTreeIndex; +import org.apache.carbondata.core.carbon.metadata.blocklet.index.BlockletIndex; +import org.apache.carbondata.core.carbon.metadata.blocklet.index.BlockletMinMaxIndex; +import org.apache.carbondata.core.carbon.metadata.blocklet.sort.SortState; +import org.apache.carbondata.core.carbon.metadata.datatype.DataType; +import org.apache.carbondata.core.carbon.metadata.encoder.Encoding; +import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.metadata.ValueEncoderMeta; +import org.apache.carbondata.core.reader.CarbonIndexFileReader; +import org.apache.carbondata.format.BlockIndex; + +/** + * Footer reader class + */ +public abstract class AbstractDataFileFooterConverter { + + /** + * Below method will be used to convert the thrift presence meta to wrapper + * presence meta + * + * @param presentMetadataThrift + * @return wrapper presence meta + */ + private static PresenceMeta getPresenceMeta( + org.apache.carbondata.format.PresenceMeta presentMetadataThrift) { + PresenceMeta presenceMeta = new PresenceMeta(); + presenceMeta.setRepresentNullValues(presentMetadataThrift.isRepresents_presence()); + presenceMeta.setBitSet(BitSet.valueOf(presentMetadataThrift.getPresent_bit_stream())); + return presenceMeta; + } + + /** + * Below method will be used to get the index info from index file + * + * @param filePath file path of the index file + * @param tableBlockInfoList table block index + * @return list of index info + * @throws IOException problem while reading the index file + */ + public List<DataFileFooter> getIndexInfo(String filePath, List<TableBlockInfo> tableBlockInfoList) + throws IOException, CarbonUtilException { + CarbonIndexFileReader indexReader = new CarbonIndexFileReader(); + List<DataFileFooter> dataFileFooters = new ArrayList<DataFileFooter>(); + try { + // open the reader + indexReader.openThriftReader(filePath); + // get the index header + org.apache.carbondata.format.IndexHeader readIndexHeader = indexReader.readIndexHeader(); + List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>(); + List<org.apache.carbondata.format.ColumnSchema> table_columns = + readIndexHeader.getTable_columns(); + for (int i = 0; i < table_columns.size(); i++) { + columnSchemaList.add(thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i))); + } + // get the segment info + SegmentInfo segmentInfo = getSegmentInfo(readIndexHeader.getSegment_info()); + BlockletIndex blockletIndex = null; + int counter = 0; + DataFileFooter dataFileFooter = null; + // read the block info from file + while (indexReader.hasNext()) { + BlockIndex readBlockIndexInfo = indexReader.readBlockIndexInfo(); + blockletIndex = getBlockletIndex(readBlockIndexInfo.getBlock_index()); + dataFileFooter = new DataFileFooter(); + TableBlockInfo tableBlockInfo = tableBlockInfoList.get(counter++); + tableBlockInfo.setBlockOffset(readBlockIndexInfo.getOffset()); + tableBlockInfo.setVersion((short) readIndexHeader.getVersion()); + int blockletSize = getBlockletSize(readBlockIndexInfo); + tableBlockInfo.getBlockletInfos().setNoOfBlockLets(blockletSize); + dataFileFooter.setBlockletIndex(blockletIndex); + dataFileFooter.setColumnInTable(columnSchemaList); + dataFileFooter.setNumberOfRows(readBlockIndexInfo.getNum_rows()); + dataFileFooter.setBlockInfo(new BlockInfo(tableBlockInfo)); + dataFileFooter.setSegmentInfo(segmentInfo); + dataFileFooters.add(dataFileFooter); + } + } finally { + indexReader.closeThriftReader(); + } + return dataFileFooters; + } + + /** + * the methods returns the number of blocklets in a block + * + * @param readBlockIndexInfo + * @return + */ + protected int getBlockletSize(BlockIndex readBlockIndexInfo) { + long num_rows = readBlockIndexInfo.getNum_rows(); + int blockletSize = Integer.parseInt(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.BLOCKLET_SIZE, + CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL)); + int remainder = (int) (num_rows % blockletSize); + int noOfBlockLet = (int) (num_rows / blockletSize); + // there could be some blocklets which will not + // contain the total records equal to the blockletSize + if (remainder > 0) { + noOfBlockLet = noOfBlockLet + 1; + } + return noOfBlockLet; + } + + /** + * Below method will be used to convert thrift file meta to wrapper file meta + */ + public abstract DataFileFooter readDataFileFooter(TableBlockInfo tableBlockInfo) + throws IOException; + + /** + * Below method will be used to get blocklet index for data file meta + * + * @param blockletIndexList + * @return blocklet index + */ + protected BlockletIndex getBlockletIndexForDataFileFooter(List<BlockletIndex> blockletIndexList) { + BlockletIndex blockletIndex = new BlockletIndex(); + BlockletBTreeIndex blockletBTreeIndex = new BlockletBTreeIndex(); + blockletBTreeIndex.setStartKey(blockletIndexList.get(0).getBtreeIndex().getStartKey()); + blockletBTreeIndex + .setEndKey(blockletIndexList.get(blockletIndexList.size() - 1).getBtreeIndex().getEndKey()); + blockletIndex.setBtreeIndex(blockletBTreeIndex); + byte[][] currentMinValue = blockletIndexList.get(0).getMinMaxIndex().getMinValues().clone(); + byte[][] currentMaxValue = blockletIndexList.get(0).getMinMaxIndex().getMaxValues().clone(); + byte[][] minValue = null; + byte[][] maxValue = null; + for (int i = 1; i < blockletIndexList.size(); i++) { + minValue = blockletIndexList.get(i).getMinMaxIndex().getMinValues(); + maxValue = blockletIndexList.get(i).getMinMaxIndex().getMaxValues(); + for (int j = 0; j < maxValue.length; j++) { + if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(currentMinValue[j], minValue[j]) > 0) { + currentMinValue[j] = minValue[j].clone(); + } + if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(currentMaxValue[j], maxValue[j]) < 0) { + currentMaxValue[j] = maxValue[j].clone(); + } + } + } + + BlockletMinMaxIndex minMax = new BlockletMinMaxIndex(); + minMax.setMaxValues(currentMaxValue); + minMax.setMinValues(currentMinValue); + blockletIndex.setMinMaxIndex(minMax); + return blockletIndex; + } + + protected ColumnSchema thriftColumnSchmeaToWrapperColumnSchema( + org.apache.carbondata.format.ColumnSchema externalColumnSchema) { + ColumnSchema wrapperColumnSchema = new ColumnSchema(); + wrapperColumnSchema.setColumnUniqueId(externalColumnSchema.getColumn_id()); + wrapperColumnSchema.setColumnName(externalColumnSchema.getColumn_name()); + wrapperColumnSchema.setColumnar(externalColumnSchema.isColumnar()); + wrapperColumnSchema + .setDataType(thriftDataTyopeToWrapperDataType(externalColumnSchema.data_type)); + wrapperColumnSchema.setDimensionColumn(externalColumnSchema.isDimension()); + List<Encoding> encoders = new ArrayList<Encoding>(); + for (org.apache.carbondata.format.Encoding encoder : externalColumnSchema.getEncoders()) { + encoders.add(fromExternalToWrapperEncoding(encoder)); + } + wrapperColumnSchema.setEncodingList(encoders); + wrapperColumnSchema.setNumberOfChild(externalColumnSchema.getNum_child()); + wrapperColumnSchema.setPrecision(externalColumnSchema.getPrecision()); + wrapperColumnSchema.setColumnGroup(externalColumnSchema.getColumn_group_id()); + wrapperColumnSchema.setScale(externalColumnSchema.getScale()); + wrapperColumnSchema.setDefaultValue(externalColumnSchema.getDefault_value()); + wrapperColumnSchema.setAggregateFunction(externalColumnSchema.getAggregate_function()); + return wrapperColumnSchema; + } + + /** + * Below method is convert the thrift encoding to wrapper encoding + * + * @param encoderThrift thrift encoding + * @return wrapper encoding + */ + protected Encoding fromExternalToWrapperEncoding( + org.apache.carbondata.format.Encoding encoderThrift) { + switch (encoderThrift) { + case DICTIONARY: + return Encoding.DICTIONARY; + case DELTA: + return Encoding.DELTA; + case RLE: + return Encoding.RLE; + case INVERTED_INDEX: + return Encoding.INVERTED_INDEX; + case BIT_PACKED: + return Encoding.BIT_PACKED; + case DIRECT_DICTIONARY: + return Encoding.DIRECT_DICTIONARY; + default: + throw new IllegalArgumentException(encoderThrift.toString() + " is not supported"); + } + } + + /** + * Below method will be used to convert the thrift compression to wrapper + * compression codec + * + * @param compressionCodecThrift + * @return wrapper compression codec + */ + protected CompressionCodec getCompressionCodec( + org.apache.carbondata.format.CompressionCodec compressionCodecThrift) { + switch (compressionCodecThrift) { + case SNAPPY: + return CompressionCodec.SNAPPY; + default: + return CompressionCodec.SNAPPY; + } + } + + /** + * Below method will be used to convert thrift segment object to wrapper + * segment object + * + * @param segmentInfo thrift segment info object + * @return wrapper segment info object + */ + protected SegmentInfo getSegmentInfo(org.apache.carbondata.format.SegmentInfo segmentInfo) { + SegmentInfo info = new SegmentInfo(); + int[] cardinality = new int[segmentInfo.getColumn_cardinalities().size()]; + for (int i = 0; i < cardinality.length; i++) { + cardinality[i] = segmentInfo.getColumn_cardinalities().get(i); + } + info.setColumnCardinality(cardinality); + info.setNumberOfColumns(segmentInfo.getNum_cols()); + return info; + } + + /** + * Below method will be used to convert the blocklet index of thrift to + * wrapper + * + * @param blockletIndexThrift + * @return blocklet index wrapper + */ + protected BlockletIndex getBlockletIndex( + org.apache.carbondata.format.BlockletIndex blockletIndexThrift) { + org.apache.carbondata.format.BlockletBTreeIndex btreeIndex = + blockletIndexThrift.getB_tree_index(); + org.apache.carbondata.format.BlockletMinMaxIndex minMaxIndex = + blockletIndexThrift.getMin_max_index(); + return new BlockletIndex( + new BlockletBTreeIndex(btreeIndex.getStart_key(), btreeIndex.getEnd_key()), + new BlockletMinMaxIndex(minMaxIndex.getMin_values(), minMaxIndex.getMax_values())); + } + + /** + * Below method will be used to convert the thrift compression meta to + * wrapper chunk compression meta + * + * @param chunkCompressionMetaThrift + * @return chunkCompressionMetaWrapper + */ + protected ChunkCompressorMeta getChunkCompressionMeta( + org.apache.carbondata.format.ChunkCompressionMeta chunkCompressionMetaThrift) { + ChunkCompressorMeta compressorMeta = new ChunkCompressorMeta(); + compressorMeta + .setCompressor(getCompressionCodec(chunkCompressionMetaThrift.getCompression_codec())); + compressorMeta.setCompressedSize(chunkCompressionMetaThrift.getTotal_compressed_size()); + compressorMeta.setUncompressedSize(chunkCompressionMetaThrift.getTotal_uncompressed_size()); + return compressorMeta; + } + + /** + * Below method will be used to convert the thrift data type to wrapper data + * type + * + * @param dataTypeThrift + * @return dataType wrapper + */ + protected DataType thriftDataTyopeToWrapperDataType( + org.apache.carbondata.format.DataType dataTypeThrift) { + switch (dataTypeThrift) { + case STRING: + return DataType.STRING; + case SHORT: + return DataType.SHORT; + case INT: + return DataType.INT; + case LONG: + return DataType.LONG; + case DOUBLE: + return DataType.DOUBLE; + case DECIMAL: + return DataType.DECIMAL; + case TIMESTAMP: + return DataType.TIMESTAMP; + case ARRAY: + return DataType.ARRAY; + case STRUCT: + return DataType.STRUCT; + default: + return DataType.STRING; + } + } + + /** + * Below method will be used to convert the thrift object to wrapper object + * + * @param sortStateThrift + * @return wrapper sort state object + */ + protected SortState getSortState(org.apache.carbondata.format.SortState sortStateThrift) { + if (sortStateThrift == org.apache.carbondata.format.SortState.SORT_EXPLICIT) { + return SortState.SORT_EXPLICT; + } else if (sortStateThrift == org.apache.carbondata.format.SortState.SORT_NATIVE) { + return SortState.SORT_NATIVE; + } else { + return SortState.SORT_NONE; + } + } + + /** + * Below method will be used to convert the thrift data chunk to wrapper + * data chunk + * + * @param datachunkThrift + * @return wrapper data chunk + */ + protected DataChunk getDataChunk(org.apache.carbondata.format.DataChunk datachunkThrift, + boolean isPresenceMetaPresent) { + DataChunk dataChunk = new DataChunk(); + dataChunk.setColumnUniqueIdList(datachunkThrift.getColumn_ids()); + dataChunk.setDataPageLength(datachunkThrift.getData_page_length()); + dataChunk.setDataPageOffset(datachunkThrift.getData_page_offset()); + if (isPresenceMetaPresent) { + dataChunk.setNullValueIndexForColumn(getPresenceMeta(datachunkThrift.getPresence())); + } + dataChunk.setRlePageLength(datachunkThrift.getRle_page_length()); + dataChunk.setRlePageOffset(datachunkThrift.getRle_page_offset()); + dataChunk.setRowMajor(datachunkThrift.isRowMajor()); + dataChunk.setRowIdPageLength(datachunkThrift.getRowid_page_length()); + dataChunk.setRowIdPageOffset(datachunkThrift.getRowid_page_offset()); + dataChunk.setSortState(getSortState(datachunkThrift.getSort_state())); + dataChunk.setChunkCompressionMeta(getChunkCompressionMeta(datachunkThrift.getChunk_meta())); + List<Encoding> encodingList = new ArrayList<Encoding>(datachunkThrift.getEncoders().size()); + for (int i = 0; i < datachunkThrift.getEncoders().size(); i++) { + encodingList.add(fromExternalToWrapperEncoding(datachunkThrift.getEncoders().get(i))); + } + dataChunk.setEncoderList(encodingList); + if (encodingList.contains(Encoding.DELTA)) { + List<ByteBuffer> thriftEncoderMeta = datachunkThrift.getEncoder_meta(); + List<ValueEncoderMeta> encodeMetaList = + new ArrayList<ValueEncoderMeta>(thriftEncoderMeta.size()); + for (int i = 0; i < thriftEncoderMeta.size(); i++) { + encodeMetaList.add(CarbonUtil.deserializeEncoderMeta(thriftEncoderMeta.get(i).array())); + } + dataChunk.setValueEncoderMeta(encodeMetaList); + } + return dataChunk; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java index de0ea44..4f8a435 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java @@ -33,6 +33,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties; import org.apache.carbondata.core.carbon.metadata.index.BlockIndexInfo; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastorage.store.compression.SnappyCompression.SnappyByteCompression; import org.apache.carbondata.core.datastorage.store.compression.ValueCompressionModel; import org.apache.carbondata.core.metadata.BlockletInfoColumnar; import org.apache.carbondata.core.metadata.ValueEncoderMeta; @@ -40,11 +41,13 @@ import org.apache.carbondata.format.BlockIndex; import org.apache.carbondata.format.BlockletBTreeIndex; import org.apache.carbondata.format.BlockletIndex; import org.apache.carbondata.format.BlockletInfo; +import org.apache.carbondata.format.BlockletInfo2; import org.apache.carbondata.format.BlockletMinMaxIndex; import org.apache.carbondata.format.ChunkCompressionMeta; import org.apache.carbondata.format.ColumnSchema; import org.apache.carbondata.format.CompressionCodec; import org.apache.carbondata.format.DataChunk; +import org.apache.carbondata.format.DataChunk2; import org.apache.carbondata.format.Encoding; import org.apache.carbondata.format.FileFooter; import org.apache.carbondata.format.IndexHeader; @@ -72,22 +75,60 @@ public class CarbonMetadataUtil { * @return FileFooter */ public static FileFooter convertFileFooter(List<BlockletInfoColumnar> infoList, int numCols, - int[] cardinalities, List<ColumnSchema> columnSchemaList, - SegmentProperties segmentProperties) throws IOException { + int[] cardinalities, List<ColumnSchema> columnSchemaList, SegmentProperties segmentProperties) + throws IOException { + FileFooter footer = getFileFooter(infoList, cardinalities, columnSchemaList); + for (BlockletInfoColumnar info : infoList) { + footer.addToBlocklet_info_list(getBlockletInfo(info, columnSchemaList, segmentProperties)); + } + return footer; + } + /** + * Below method will be used to get the file footer object + * + * @param infoList blocklet info + * @param cardinalities cardinlaity of dimension columns + * @param columnSchemaList column schema list + * @return file footer + */ + private static FileFooter getFileFooter(List<BlockletInfoColumnar> infoList, int[] cardinalities, + List<ColumnSchema> columnSchemaList) { SegmentInfo segmentInfo = new SegmentInfo(); segmentInfo.setNum_cols(columnSchemaList.size()); segmentInfo.setColumn_cardinalities(CarbonUtil.convertToIntegerList(cardinalities)); - + short version = Short.parseShort( + CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION)); FileFooter footer = new FileFooter(); + footer.setVersion(version); footer.setNum_rows(getTotalNumberOfRows(infoList)); footer.setSegment_info(segmentInfo); + footer.setTable_columns(columnSchemaList); for (BlockletInfoColumnar info : infoList) { footer.addToBlocklet_index_list(getBlockletIndex(info)); } - footer.setTable_columns(columnSchemaList); + return footer; + } + + /** + * Below method will be used to get the file footer object for + * + * @param infoList blocklet info + * @param cardinalities cardinality of each column + * @param columnSchemaList column schema list + * @param dataChunksOffset data chunks offsets + * @param dataChunksLength data chunks length + * @return filefooter thrift object + */ + public static FileFooter convertFilterFooter2(List<BlockletInfoColumnar> infoList, + int[] cardinalities, List<ColumnSchema> columnSchemaList, List<List<Long>> dataChunksOffset, + List<List<Short>> dataChunksLength) { + FileFooter footer = getFileFooter(infoList, cardinalities, columnSchemaList); + int index = 0; for (BlockletInfoColumnar info : infoList) { - footer.addToBlocklet_info_list(getBlockletInfo(info, columnSchemaList, segmentProperties)); + footer.addToBlocklet_info_list2( + getBlockletInfo2(info, dataChunksOffset.get(index), dataChunksLength.get(index))); + index++; } return footer; } @@ -142,15 +183,31 @@ public class CarbonMetadataUtil { return blockletIndex; } + /** + * Below method will be used to get the blocklet info object for + * data version 2 file + * + * @param blockletInfoColumnar blocklet info + * @param dataChunkOffsets data chunks offsets + * @param dataChunksLength data chunks length + * @return blocklet info version 2 + */ + private static BlockletInfo2 getBlockletInfo2(BlockletInfoColumnar blockletInfoColumnar, + List<Long> dataChunkOffsets, List<Short> dataChunksLength) { + BlockletInfo2 blockletInfo = new BlockletInfo2(); + blockletInfo.setNum_rows(blockletInfoColumnar.getNumberOfKeys()); + blockletInfo.setColumn_data_chunks_length(dataChunksLength); + blockletInfo.setColumn_data_chunks_offsets(dataChunkOffsets); + return blockletInfo; + } + private static BlockletInfo getBlockletInfo(BlockletInfoColumnar blockletInfoColumnar, - List<ColumnSchema> columnSchenma, - SegmentProperties segmentProperties) throws IOException { + List<ColumnSchema> columnSchenma, SegmentProperties segmentProperties) throws IOException { BlockletInfo blockletInfo = new BlockletInfo(); blockletInfo.setNum_rows(blockletInfoColumnar.getNumberOfKeys()); List<DataChunk> colDataChunks = new ArrayList<DataChunk>(); - blockletInfoColumnar.getKeyLengths(); int j = 0; int aggregateIndex = 0; boolean[] isSortedKeyColumn = blockletInfoColumnar.getIsSortedKeyColumn(); @@ -419,6 +476,9 @@ public class CarbonMetadataUtil { segmentInfo.setColumn_cardinalities(CarbonUtil.convertToIntegerList(columnCardinality)); // create index header object IndexHeader indexHeader = new IndexHeader(); + short version = Short.parseShort( + CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION)); + indexHeader.setVersion(version); // set the segment info indexHeader.setSegment_info(segmentInfo); // set the column names @@ -440,11 +500,91 @@ public class CarbonMetadataUtil { for (BlockIndexInfo blockIndexInfo : blockIndexInfoList) { blockIndex = new BlockIndex(); blockIndex.setNum_rows(blockIndexInfo.getNumberOfRows()); - blockIndex.setOffset(blockIndexInfo.getNumberOfRows()); + blockIndex.setOffset(blockIndexInfo.getOffset()); blockIndex.setFile_name(blockIndexInfo.getFileName()); blockIndex.setBlock_index(getBlockletIndex(blockIndexInfo.getBlockletIndex())); thriftBlockIndexList.add(blockIndex); } return thriftBlockIndexList; } + + /** + * Below method will be used to get the data chunk object for all the + * columns + * + * @param blockletInfoColumnar blocklet info + * @param columnSchenma list of columns + * @param segmentProperties segment properties + * @return list of data chunks + * @throws IOException + */ + public static List<DataChunk2> getDatachunk2(BlockletInfoColumnar blockletInfoColumnar, + List<ColumnSchema> columnSchenma, SegmentProperties segmentProperties) throws IOException { + List<DataChunk2> colDataChunks = new ArrayList<DataChunk2>(); + int rowIdIndex = 0; + int aggregateIndex = 0; + boolean[] isSortedKeyColumn = blockletInfoColumnar.getIsSortedKeyColumn(); + boolean[] aggKeyBlock = blockletInfoColumnar.getAggKeyBlock(); + boolean[] colGrpblock = blockletInfoColumnar.getColGrpBlocks(); + for (int i = 0; i < blockletInfoColumnar.getKeyLengths().length; i++) { + DataChunk2 dataChunk = new DataChunk2(); + dataChunk.setChunk_meta(getChunkCompressionMeta()); + List<Encoding> encodings = new ArrayList<Encoding>(); + if (containsEncoding(i, Encoding.DICTIONARY, columnSchenma, segmentProperties)) { + encodings.add(Encoding.DICTIONARY); + } + if (containsEncoding(i, Encoding.DIRECT_DICTIONARY, columnSchenma, segmentProperties)) { + encodings.add(Encoding.DIRECT_DICTIONARY); + } + dataChunk.setRowMajor(colGrpblock[i]); + //TODO : Once schema PR is merged and information needs to be passed here. + dataChunk.setData_page_length(blockletInfoColumnar.getKeyLengths()[i]); + if (aggKeyBlock[i]) { + dataChunk.setRle_page_length(blockletInfoColumnar.getDataIndexMapLength()[aggregateIndex]); + encodings.add(Encoding.RLE); + aggregateIndex++; + } + dataChunk + .setSort_state(isSortedKeyColumn[i] ? SortState.SORT_EXPLICIT : SortState.SORT_NATIVE); + + if (!isSortedKeyColumn[i]) { + dataChunk.setRowid_page_length(blockletInfoColumnar.getKeyBlockIndexLength()[rowIdIndex]); + encodings.add(Encoding.INVERTED_INDEX); + rowIdIndex++; + } + + //TODO : Right now the encodings are happening at runtime. change as per this encoders. + dataChunk.setEncoders(encodings); + + colDataChunks.add(dataChunk); + } + + for (int i = 0; i < blockletInfoColumnar.getMeasureLength().length; i++) { + DataChunk2 dataChunk = new DataChunk2(); + dataChunk.setChunk_meta(getChunkCompressionMeta()); + dataChunk.setRowMajor(false); + //TODO : Once schema PR is merged and information needs to be passed here. + dataChunk.setData_page_length(blockletInfoColumnar.getMeasureLength()[i]); + //TODO : Right now the encodings are happening at runtime. change as per this encoders. + List<Encoding> encodings = new ArrayList<Encoding>(); + encodings.add(Encoding.DELTA); + dataChunk.setEncoders(encodings); + //TODO writing dummy presence meta need to set actual presence + //meta + PresenceMeta presenceMeta = new PresenceMeta(); + presenceMeta.setPresent_bit_streamIsSet(true); + presenceMeta.setPresent_bit_stream(SnappyByteCompression.INSTANCE + .compress(blockletInfoColumnar.getMeasureNullValueIndex()[i].toByteArray())); + dataChunk.setPresence(presenceMeta); + //TODO : PresenceMeta needs to be implemented and set here + // dataChunk.setPresence(new PresenceMeta()); + //TODO : Need to write ValueCompression meta here. + List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>(); + encoderMetaList.add(ByteBuffer.wrap(serializeEncoderMeta( + createValueEncoderMeta(blockletInfoColumnar.getCompressionModel(), i)))); + dataChunk.setEncoder_meta(encoderMetaList); + colDataChunks.add(dataChunk); + } + return colDataChunks; + } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index b856928..adb0e6a 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -83,6 +83,7 @@ public final class CarbonProperties { validateHighCardinalityIdentify(); validateHighCardinalityThreshold(); validateHighCardinalityInRowCountPercentage(); + validateCarbonDataFileVersion(); } private void validateBadRecordsLocation() { @@ -106,15 +107,15 @@ public final class CarbonProperties { if (blockletSize < CarbonCommonConstants.BLOCKLET_SIZE_MIN_VAL || blockletSize > CarbonCommonConstants.BLOCKLET_SIZE_MAX_VAL) { LOGGER.info("The blocklet size value \"" + blockletSizeStr - + "\" is invalid. Using the default value \"" - + CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL); + + "\" is invalid. Using the default value \"" + + CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL); carbonProperties.setProperty(CarbonCommonConstants.BLOCKLET_SIZE, CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL); } } catch (NumberFormatException e) { LOGGER.info("The blocklet size value \"" + blockletSizeStr - + "\" is invalid. Using the default value \"" - + CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL); + + "\" is invalid. Using the default value \"" + + CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL); carbonProperties.setProperty(CarbonCommonConstants.BLOCKLET_SIZE, CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL); } @@ -131,16 +132,16 @@ public final class CarbonProperties { if (numCores < CarbonCommonConstants.NUM_CORES_MIN_VAL || numCores > CarbonCommonConstants.NUM_CORES_MAX_VAL) { - LOGGER.info("The num Cores value \"" + numCoresStr - + "\" is invalid. Using the default value \"" - + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); + LOGGER.info( + "The num Cores value \"" + numCoresStr + "\" is invalid. Using the default value \"" + + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); carbonProperties.setProperty(CarbonCommonConstants.NUM_CORES, CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); } } catch (NumberFormatException e) { - LOGGER.info("The num Cores value \"" + numCoresStr - + "\" is invalid. Using the default value \"" - + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); + LOGGER.info( + "The num Cores value \"" + numCoresStr + "\" is invalid. Using the default value \"" + + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); carbonProperties.setProperty(CarbonCommonConstants.NUM_CORES, CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); } @@ -150,9 +151,8 @@ public final class CarbonProperties { * This method validates the number cores specified for mdk block sort */ private void validateNumCoresBlockSort() { - String numCoresStr = carbonProperties - .getProperty(CarbonCommonConstants.NUM_CORES_BLOCK_SORT, - CarbonCommonConstants.NUM_CORES_BLOCK_SORT_DEFAULT_VAL); + String numCoresStr = carbonProperties.getProperty(CarbonCommonConstants.NUM_CORES_BLOCK_SORT, + CarbonCommonConstants.NUM_CORES_BLOCK_SORT_DEFAULT_VAL); try { int numCores = Integer.parseInt(numCoresStr); @@ -183,25 +183,25 @@ public final class CarbonProperties { int sortSize = Integer.parseInt(sortSizeStr); if (sortSize < CarbonCommonConstants.SORT_SIZE_MIN_VAL) { - LOGGER.info("The batch size value \"" + sortSizeStr - + "\" is invalid. Using the default value \"" - + CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL); + LOGGER.info( + "The batch size value \"" + sortSizeStr + "\" is invalid. Using the default value \"" + + CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL); carbonProperties.setProperty(CarbonCommonConstants.SORT_SIZE, CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL); } } catch (NumberFormatException e) { - LOGGER.info("The batch size value \"" + sortSizeStr - + "\" is invalid. Using the default value \"" - + CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL); + LOGGER.info( + "The batch size value \"" + sortSizeStr + "\" is invalid. Using the default value \"" + + CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL); carbonProperties.setProperty(CarbonCommonConstants.SORT_SIZE, CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL); } } private void validateHighCardinalityIdentify() { - String highcardIdentifyStr = carbonProperties.getProperty( - CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE, - CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE_DEFAULT); + String highcardIdentifyStr = carbonProperties + .getProperty(CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE, + CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE_DEFAULT); try { Boolean.parseBoolean(highcardIdentifyStr); } catch (NumberFormatException e) { @@ -214,12 +214,12 @@ public final class CarbonProperties { } private void validateHighCardinalityThreshold() { - String highcardThresholdStr = carbonProperties.getProperty( - CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD, - CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD_DEFAULT); + String highcardThresholdStr = carbonProperties + .getProperty(CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD, + CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD_DEFAULT); try { int highcardThreshold = Integer.parseInt(highcardThresholdStr); - if(highcardThreshold < CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD_MIN){ + if (highcardThreshold < CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD_MIN) { LOGGER.info("The high cardinality threshold value \"" + highcardThresholdStr + "\" is invalid. Using the min value \"" + CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD_MIN); @@ -236,22 +236,22 @@ public final class CarbonProperties { } private void validateHighCardinalityInRowCountPercentage() { - String highcardPercentageStr = carbonProperties.getProperty( - CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE, - CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT); + String highcardPercentageStr = carbonProperties + .getProperty(CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE, + CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT); try { double highcardPercentage = Double.parseDouble(highcardPercentageStr); - if(highcardPercentage <= 0){ - LOGGER.info("The percentage of high cardinality in row count value \"" - + highcardPercentageStr + "\" is invalid. Using the default value \"" - + CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT); - carbonProperties.setProperty( - CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE, + if (highcardPercentage <= 0) { + LOGGER.info( + "The percentage of high cardinality in row count value \"" + highcardPercentageStr + + "\" is invalid. Using the default value \"" + + CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT); + carbonProperties.setProperty(CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE, CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT); } } catch (NumberFormatException e) { - LOGGER.info("The percentage of high cardinality in row count value \"" - + highcardPercentageStr + "\" is invalid. Using the default value \"" + LOGGER.info("The percentage of high cardinality in row count value \"" + highcardPercentageStr + + "\" is invalid. Using the default value \"" + CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT); carbonProperties.setProperty(CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE, CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT); @@ -259,6 +259,34 @@ public final class CarbonProperties { } /** + * Below method will be used to validate the data file version parameter + * if parameter is invalid current version will be set + */ + private void validateCarbonDataFileVersion() { + short carbondataFileVersion = CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION; + String carbondataFileVersionString = + carbonProperties.getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION); + try { + carbondataFileVersion = Short.parseShort(carbondataFileVersionString); + } catch (NumberFormatException e) { + carbondataFileVersion = CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION; + LOGGER.info("Current Data file version property is invalid \"" + carbondataFileVersionString + + "\" is invalid. Using the Current data file version value \"" + carbondataFileVersion); + carbonProperties + .setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, carbondataFileVersion + ""); + } + if (carbondataFileVersion > CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION + || carbondataFileVersion < 0) { + LOGGER.info("Current Data file version property is invalid \"" + carbondataFileVersionString + + "\" is invalid. Using the Current data file version value \"" + carbondataFileVersion); + carbondataFileVersion = CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION; + carbonProperties + .setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, carbondataFileVersion + ""); + } + + } + + /** * This method will read all the properties from file and load it into * memory */ @@ -278,18 +306,18 @@ public final class CarbonProperties { carbonProperties.load(fis); } } catch (FileNotFoundException e) { - LOGGER.error("The file: " + CarbonCommonConstants.CARBON_PROPERTIES_FILE_PATH - + " does not exist"); + LOGGER.error( + "The file: " + CarbonCommonConstants.CARBON_PROPERTIES_FILE_PATH + " does not exist"); } catch (IOException e) { - LOGGER.error("Error while reading the file: " - + CarbonCommonConstants.CARBON_PROPERTIES_FILE_PATH); + LOGGER.error( + "Error while reading the file: " + CarbonCommonConstants.CARBON_PROPERTIES_FILE_PATH); } finally { if (null != fis) { try { fis.close(); } catch (IOException e) { LOGGER.error("Error while closing the file stream for file: " - + CarbonCommonConstants.CARBON_PROPERTIES_FILE_PATH); + + CarbonCommonConstants.CARBON_PROPERTIES_FILE_PATH); } } } @@ -402,6 +430,7 @@ public final class CarbonProperties { /** * gettting the unmerged segment numbers to be merged. + * * @return */ public int[] getCompactionSegmentLevelCount() { @@ -411,7 +440,7 @@ public final class CarbonProperties { CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD); int[] compactionSize = getIntArray(commaSeparatedLevels); - if(null == compactionSize){ + if (null == compactionSize) { compactionSize = getIntArray(CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD); } @@ -419,7 +448,6 @@ public final class CarbonProperties { } /** - * * @param commaSeparatedLevels * @return */ @@ -430,13 +458,12 @@ public final class CarbonProperties { for (String levelSize : levels) { try { int size = Integer.parseInt(levelSize.trim()); - if(validate(size,100,0,-1) < 0 ){ + if (validate(size, 100, 0, -1) < 0) { // if given size is out of boundary then take default value for all levels. return null; } compactionSize[i++] = size; - } - catch(NumberFormatException e){ + } catch (NumberFormatException e) { LOGGER.error( "Given value for property" + CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD + " is not proper. Taking the default value " http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index 3c976db..162e9b9 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -21,6 +21,8 @@ package org.apache.carbondata.core.util; import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.DataInputStream; import java.io.File; @@ -28,6 +30,7 @@ import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStreamReader; +import java.io.ObjectInputStream; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.charset.Charset; @@ -49,7 +52,6 @@ import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; import org.apache.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk; import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter; -import org.apache.carbondata.core.carbon.metadata.blocklet.datachunk.DataChunk; import org.apache.carbondata.core.carbon.metadata.datatype.DataType; import org.apache.carbondata.core.carbon.metadata.encoder.Encoding; import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension; @@ -67,11 +69,20 @@ import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile; import org.apache.carbondata.core.datastorage.store.impl.FileFactory; import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor; import org.apache.carbondata.core.metadata.ValueEncoderMeta; +import org.apache.carbondata.core.reader.ThriftReader; +import org.apache.carbondata.core.reader.ThriftReader.TBaseCreator; +import org.apache.carbondata.format.DataChunk2; import org.apache.carbondata.scan.model.QueryDimension; import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.thrift.TBase; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TIOStreamTransport; + import org.pentaho.di.core.exception.KettleException; @@ -946,29 +957,25 @@ public final class CarbonUtil { * @return value compression model */ public static ValueCompressionModel getValueCompressionModel( - List<DataChunk> measureDataChunkList) { - Object[] maxValue = new Object[measureDataChunkList.size()]; - Object[] minValue = new Object[measureDataChunkList.size()]; - Object[] uniqueValue = new Object[measureDataChunkList.size()]; - int[] decimal = new int[measureDataChunkList.size()]; - char[] type = new char[measureDataChunkList.size()]; - byte[] dataTypeSelected = new byte[measureDataChunkList.size()]; + List<ValueEncoderMeta> encodeMetaList) { + Object[] maxValue = new Object[encodeMetaList.size()]; + Object[] minValue = new Object[encodeMetaList.size()]; + Object[] uniqueValue = new Object[encodeMetaList.size()]; + int[] decimal = new int[encodeMetaList.size()]; + char[] type = new char[encodeMetaList.size()]; + byte[] dataTypeSelected = new byte[encodeMetaList.size()]; /** * to fill the meta data required for value compression model */ for (int i = 0; i < dataTypeSelected.length; i++) { - int indexOf = measureDataChunkList.get(i).getEncodingList().indexOf(Encoding.DELTA); - if (indexOf > -1) { - ValueEncoderMeta valueEncoderMeta = - measureDataChunkList.get(i).getValueEncoderMeta().get(indexOf); - maxValue[i] = valueEncoderMeta.getMaxValue(); - minValue[i] = valueEncoderMeta.getMinValue(); - uniqueValue[i] = valueEncoderMeta.getUniqueValue(); - decimal[i] = valueEncoderMeta.getDecimal(); - type[i] = valueEncoderMeta.getType(); - dataTypeSelected[i] = valueEncoderMeta.getDataTypeSelected(); - } + ValueEncoderMeta valueEncoderMeta = encodeMetaList.get(i); + maxValue[i] = valueEncoderMeta.getMaxValue(); + minValue[i] = valueEncoderMeta.getMinValue(); + uniqueValue[i] = valueEncoderMeta.getUniqueValue(); + decimal[i] = valueEncoderMeta.getDecimal(); + type[i] = valueEncoderMeta.getType(); + dataTypeSelected[i] = valueEncoderMeta.getDataTypeSelected(); } MeasureMetaDataModel measureMetadataModel = new MeasureMetaDataModel(minValue, maxValue, decimal, dataTypeSelected.length, uniqueValue, @@ -1055,11 +1062,13 @@ public final class CarbonUtil { * @return Data file metadata instance * @throws CarbonUtilException */ - public static DataFileFooter readMetadatFile(String filePath, long blockOffset, long blockLength) + public static DataFileFooter readMetadatFile(TableBlockInfo tableBlockInfo) throws CarbonUtilException { - DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); + AbstractDataFileFooterConverter fileFooterConverter = + DataFileFooterConverterFactory.getInstance() + .getDataFileFooterConverter(tableBlockInfo.getVersion()); try { - return fileFooterConverter.readDataFileFooter(filePath, blockOffset, blockLength); + return fileFooterConverter.readDataFileFooter(tableBlockInfo); } catch (IOException e) { throw new CarbonUtilException("Problem while reading the file metadata", e); } @@ -1462,5 +1471,161 @@ public final class CarbonUtil { return segmentStringbuilder.toString(); } + /** + * Below method will be used to convert the thrift object to byte array. + */ + public static byte[] getByteArray(TBase t) { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + byte[] thriftByteArray = null; + TProtocol binaryOut = new TCompactProtocol(new TIOStreamTransport(stream)); + try { + t.write(binaryOut); + stream.flush(); + thriftByteArray = stream.toByteArray(); + } catch (TException | IOException e) { + closeStreams(stream); + } finally { + closeStreams(stream); + } + return thriftByteArray; + } + + /** + * Below method will be used to convert the bytearray to data chunk object + * + * @param dataChunkBytes datachunk thrift object in bytes + * @return data chunk thrift object + */ + public static DataChunk2 readDataChunk(byte[] dataChunkBytes) { + try { + return (DataChunk2) read(dataChunkBytes, new ThriftReader.TBaseCreator() { + @Override public TBase create() { + return new DataChunk2(); + } + }); + } catch (IOException e) { + LOGGER.error(e); + } + return null; + } + + /** + * Below method will be used to convert the byte array value to thrift object for + * data chunk + * + * @param data thrift byte array + * @param creator type of thrift + * @return thrift object + * @throws IOException any problem while converting the object + */ + private static TBase read(byte[] data, TBaseCreator creator) throws IOException { + ByteArrayInputStream stream = new ByteArrayInputStream(data); + TProtocol binaryIn = new TCompactProtocol(new TIOStreamTransport(stream)); + TBase t = creator.create(); + try { + t.read(binaryIn); + } catch (TException e) { + throw new IOException(e); + } finally { + CarbonUtil.closeStreams(stream); + } + return t; + } + + /** + * Below method will be used to convert the encode metadata to + * ValueEncoderMeta object + * + * @param encoderMeta + * @return ValueEncoderMeta object + */ + public static ValueEncoderMeta deserializeEncoderMeta(byte[] encoderMeta) { + // TODO : should remove the unnecessary fields. + ByteArrayInputStream aos = null; + ObjectInputStream objStream = null; + ValueEncoderMeta meta = null; + try { + aos = new ByteArrayInputStream(encoderMeta); + objStream = new ObjectInputStream(aos); + meta = (ValueEncoderMeta) objStream.readObject(); + } catch (ClassNotFoundException e) { + LOGGER.error(e); + } catch (IOException e) { + CarbonUtil.closeStreams(objStream); + } + return meta; + } + + /** + * Below method will be used to convert indexes in range + * Indexes=[0,1,2,3,4,5,6,7,8,9] + * Length=9 + * number of element in group =5 + * then output will be [0,1,2,3,4],[5,6,7,8],[9] + * + * @param indexes indexes + * @param length number of element to be considered + * @param numberOfElementInGroup number of element in group + * @return range indexes + */ + public static int[][] getRangeIndex(int[] indexes, int length, int numberOfElementInGroup) { + List<List<Integer>> rangeList = new ArrayList<>(); + int[][] outputArray = null; + int k = 0; + int index = 1; + if (indexes.length == 1) { + outputArray = new int[1][2]; + outputArray[0][0] = indexes[0]; + outputArray[0][1] = indexes[0]; + return outputArray; + } + while (index < length) { + if (indexes[index] - indexes[index - 1] == 1 && k < numberOfElementInGroup - 1) { + k++; + } else { + if (k > 0) { + List<Integer> range = new ArrayList<>(); + rangeList.add(range); + range.add(indexes[index - k - 1]); + range.add(indexes[index - 1]); + } else { + List<Integer> range = new ArrayList<>(); + rangeList.add(range); + range.add(indexes[index - 1]); + } + k = 0; + } + index++; + } + if (k > 0) { + List<Integer> range = new ArrayList<>(); + rangeList.add(range); + range.add(indexes[index - k - 1]); + range.add(indexes[index - 1]); + } else { + List<Integer> range = new ArrayList<>(); + rangeList.add(range); + range.add(indexes[index - 1]); + + } + if (length != indexes.length) { + List<Integer> range = new ArrayList<>(); + rangeList.add(range); + range.add(indexes[indexes.length - 1]); + } + + // as diving in range so array size will be always 2 + outputArray = new int[rangeList.size()][2]; + for (int i = 0; i < outputArray.length; i++) { + if (rangeList.get(i).size() == 1) { + outputArray[i][0] = rangeList.get(i).get(0); + outputArray[i][1] = rangeList.get(i).get(0); + } else { + outputArray[i][0] = rangeList.get(i).get(0); + outputArray[i][1] = rangeList.get(i).get(1); + } + } + return outputArray; + } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java index 5f3565c..ea1324e 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java @@ -18,135 +18,44 @@ */ package org.apache.carbondata.core.util; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.ObjectInputStream; -import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.BitSet; import java.util.Iterator; import java.util.List; -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.carbon.datastore.block.BlockInfo; import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; import org.apache.carbondata.core.carbon.metadata.blocklet.BlockletInfo; import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter; -import org.apache.carbondata.core.carbon.metadata.blocklet.SegmentInfo; -import org.apache.carbondata.core.carbon.metadata.blocklet.compressor.ChunkCompressorMeta; -import org.apache.carbondata.core.carbon.metadata.blocklet.compressor.CompressionCodec; import org.apache.carbondata.core.carbon.metadata.blocklet.datachunk.DataChunk; -import org.apache.carbondata.core.carbon.metadata.blocklet.datachunk.PresenceMeta; -import org.apache.carbondata.core.carbon.metadata.blocklet.index.BlockletBTreeIndex; import org.apache.carbondata.core.carbon.metadata.blocklet.index.BlockletIndex; -import org.apache.carbondata.core.carbon.metadata.blocklet.index.BlockletMinMaxIndex; -import org.apache.carbondata.core.carbon.metadata.blocklet.sort.SortState; -import org.apache.carbondata.core.carbon.metadata.datatype.DataType; -import org.apache.carbondata.core.carbon.metadata.encoder.Encoding; import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema; -import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastorage.store.FileHolder; import org.apache.carbondata.core.datastorage.store.impl.FileFactory; -import org.apache.carbondata.core.metadata.ValueEncoderMeta; import org.apache.carbondata.core.reader.CarbonFooterReader; -import org.apache.carbondata.core.reader.CarbonIndexFileReader; -import org.apache.carbondata.format.BlockIndex; import org.apache.carbondata.format.FileFooter; /** * Below class will be used to convert the thrift object of data file * meta data to wrapper object */ -public class DataFileFooterConverter { - - private static final LogService LOGGER = - LogServiceFactory.getLogService(DataFileFooterConverter.class.getName()); - - /** - * Below method will be used to get the index info from index file - * - * @param filePath file path of the index file - * @param tableBlockInfoList table block index - * @return list of index info - * @throws IOException problem while reading the index file - */ - public List<DataFileFooter> getIndexInfo(String filePath, List<TableBlockInfo> tableBlockInfoList) - throws IOException, CarbonUtilException { - CarbonIndexFileReader indexReader = new CarbonIndexFileReader(); - List<DataFileFooter> dataFileFooters = new ArrayList<DataFileFooter>(); - try { - // open the reader - indexReader.openThriftReader(filePath); - // get the index header - org.apache.carbondata.format.IndexHeader readIndexHeader = indexReader.readIndexHeader(); - List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>(); - List<org.apache.carbondata.format.ColumnSchema> table_columns = - readIndexHeader.getTable_columns(); - for (int i = 0; i < table_columns.size(); i++) { - columnSchemaList.add(thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i))); - } - // get the segment info - SegmentInfo segmentInfo = getSegmentInfo(readIndexHeader.getSegment_info()); - BlockletIndex blockletIndex = null; - int counter = 0; - DataFileFooter dataFileFooter = null; - // read the block info from file - while (indexReader.hasNext()) { - BlockIndex readBlockIndexInfo = indexReader.readBlockIndexInfo(); - blockletIndex = getBlockletIndex(readBlockIndexInfo.getBlock_index()); - dataFileFooter = new DataFileFooter(); - TableBlockInfo tableBlockInfo = tableBlockInfoList.get(counter++); - int blockletSize = getBlockletSize(readBlockIndexInfo); - tableBlockInfo.getBlockletInfos().setNoOfBlockLets(blockletSize); - dataFileFooter.setBlockletIndex(blockletIndex); - dataFileFooter.setColumnInTable(columnSchemaList); - dataFileFooter.setNumberOfRows(readBlockIndexInfo.getNum_rows()); - dataFileFooter.setBlockInfo(new BlockInfo(tableBlockInfo)); - dataFileFooter.setSegmentInfo(segmentInfo); - dataFileFooters.add(dataFileFooter); - } - } finally { - indexReader.closeThriftReader(); - } - return dataFileFooters; - } - - /** - * the methods returns the number of blocklets in a block - * @param readBlockIndexInfo - * @return - */ - private int getBlockletSize(BlockIndex readBlockIndexInfo) { - long num_rows = readBlockIndexInfo.getNum_rows(); - int blockletSize = Integer.parseInt(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.BLOCKLET_SIZE, - CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL)); - int remainder = (int) (num_rows % blockletSize); - int noOfBlockLet = (int) (num_rows / blockletSize); - // there could be some blocklets which will not - // contain the total records equal to the blockletSize - if (remainder > 0) { - noOfBlockLet = noOfBlockLet + 1; - } - return noOfBlockLet; - } +public class DataFileFooterConverter extends AbstractDataFileFooterConverter { /** * Below method will be used to convert thrift file meta to wrapper file meta */ - public DataFileFooter readDataFileFooter(String filePath, long blockOffset, long blockLength) + @Override public DataFileFooter readDataFileFooter(TableBlockInfo tableBlockInfo) throws IOException { DataFileFooter dataFileFooter = new DataFileFooter(); FileHolder fileReader = null; try { - long completeBlockLength = blockOffset + blockLength; + long completeBlockLength = tableBlockInfo.getBlockLength(); long footerPointer = completeBlockLength - 8; - fileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath)); - long actualFooterOffset = fileReader.readLong(filePath, footerPointer); - CarbonFooterReader reader = new CarbonFooterReader(filePath, actualFooterOffset); + fileReader = FileFactory.getFileHolder(FileFactory.getFileType(tableBlockInfo.getFilePath())); + long actualFooterOffset = fileReader.readLong(tableBlockInfo.getFilePath(), footerPointer); + CarbonFooterReader reader = + new CarbonFooterReader(tableBlockInfo.getFilePath(), actualFooterOffset); FileFooter footer = reader.readFooter(); - dataFileFooter.setVersionId(footer.getVersion()); + dataFileFooter.setVersionId((short) footer.getVersion()); dataFileFooter.setNumberOfRows(footer.getNum_rows()); dataFileFooter.setSegmentInfo(getSegmentInfo(footer.getSegment_info())); List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>(); @@ -183,66 +92,6 @@ public class DataFileFooterConverter { } /** - * Below method will be used to get blocklet index for data file meta - * - * @param blockletIndexList - * @return blocklet index - */ - private BlockletIndex getBlockletIndexForDataFileFooter(List<BlockletIndex> blockletIndexList) { - BlockletIndex blockletIndex = new BlockletIndex(); - BlockletBTreeIndex blockletBTreeIndex = new BlockletBTreeIndex(); - blockletBTreeIndex.setStartKey(blockletIndexList.get(0).getBtreeIndex().getStartKey()); - blockletBTreeIndex - .setEndKey(blockletIndexList.get(blockletIndexList.size() - 1).getBtreeIndex().getEndKey()); - blockletIndex.setBtreeIndex(blockletBTreeIndex); - byte[][] currentMinValue = blockletIndexList.get(0).getMinMaxIndex().getMinValues().clone(); - byte[][] currentMaxValue = blockletIndexList.get(0).getMinMaxIndex().getMaxValues().clone(); - byte[][] minValue = null; - byte[][] maxValue = null; - for (int i = 1; i < blockletIndexList.size(); i++) { - minValue = blockletIndexList.get(i).getMinMaxIndex().getMinValues(); - maxValue = blockletIndexList.get(i).getMinMaxIndex().getMaxValues(); - for (int j = 0; j < maxValue.length; j++) { - if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(currentMinValue[j], minValue[j]) > 0) { - currentMinValue[j] = minValue[j].clone(); - } - if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(currentMaxValue[j], maxValue[j]) < 0) { - currentMaxValue[j] = maxValue[j].clone(); - } - } - } - - BlockletMinMaxIndex minMax = new BlockletMinMaxIndex(); - minMax.setMaxValues(currentMaxValue); - minMax.setMinValues(currentMinValue); - blockletIndex.setMinMaxIndex(minMax); - return blockletIndex; - } - - private ColumnSchema thriftColumnSchmeaToWrapperColumnSchema( - org.apache.carbondata.format.ColumnSchema externalColumnSchema) { - ColumnSchema wrapperColumnSchema = new ColumnSchema(); - wrapperColumnSchema.setColumnUniqueId(externalColumnSchema.getColumn_id()); - wrapperColumnSchema.setColumnName(externalColumnSchema.getColumn_name()); - wrapperColumnSchema.setColumnar(externalColumnSchema.isColumnar()); - wrapperColumnSchema - .setDataType(thriftDataTyopeToWrapperDataType(externalColumnSchema.data_type)); - wrapperColumnSchema.setDimensionColumn(externalColumnSchema.isDimension()); - List<Encoding> encoders = new ArrayList<Encoding>(); - for (org.apache.carbondata.format.Encoding encoder : externalColumnSchema.getEncoders()) { - encoders.add(fromExternalToWrapperEncoding(encoder)); - } - wrapperColumnSchema.setEncodingList(encoders); - wrapperColumnSchema.setNumberOfChild(externalColumnSchema.getNum_child()); - wrapperColumnSchema.setPrecision(externalColumnSchema.getPrecision()); - wrapperColumnSchema.setColumnGroup(externalColumnSchema.getColumn_group_id()); - wrapperColumnSchema.setScale(externalColumnSchema.getScale()); - wrapperColumnSchema.setDefaultValue(externalColumnSchema.getDefault_value()); - wrapperColumnSchema.setAggregateFunction(externalColumnSchema.getAggregate_function()); - return wrapperColumnSchema; - } - - /** * Below method is to convert the blocklet info of the thrift to wrapper * blocklet info * @@ -273,228 +122,4 @@ public class DataFileFooterConverter { blockletInfo.setNumberOfRows(blockletInfoThrift.getNum_rows()); return blockletInfo; } - - /** - * Below method is convert the thrift encoding to wrapper encoding - * - * @param encoderThrift thrift encoding - * @return wrapper encoding - */ - private Encoding fromExternalToWrapperEncoding( - org.apache.carbondata.format.Encoding encoderThrift) { - switch (encoderThrift) { - case DICTIONARY: - return Encoding.DICTIONARY; - case DELTA: - return Encoding.DELTA; - case RLE: - return Encoding.RLE; - case INVERTED_INDEX: - return Encoding.INVERTED_INDEX; - case BIT_PACKED: - return Encoding.BIT_PACKED; - case DIRECT_DICTIONARY: - return Encoding.DIRECT_DICTIONARY; - default: - return Encoding.DICTIONARY; - } - } - - /** - * Below method will be used to convert the thrift compression to wrapper - * compression codec - * - * @param compressionCodecThrift - * @return wrapper compression codec - */ - private CompressionCodec getCompressionCodec( - org.apache.carbondata.format.CompressionCodec compressionCodecThrift) { - switch (compressionCodecThrift) { - case SNAPPY: - return CompressionCodec.SNAPPY; - default: - return CompressionCodec.SNAPPY; - } - } - - /** - * Below method will be used to convert thrift segment object to wrapper - * segment object - * - * @param segmentInfo thrift segment info object - * @return wrapper segment info object - */ - private SegmentInfo getSegmentInfo(org.apache.carbondata.format.SegmentInfo segmentInfo) { - SegmentInfo info = new SegmentInfo(); - int[] cardinality = new int[segmentInfo.getColumn_cardinalities().size()]; - for (int i = 0; i < cardinality.length; i++) { - cardinality[i] = segmentInfo.getColumn_cardinalities().get(i); - } - info.setColumnCardinality(cardinality); - info.setNumberOfColumns(segmentInfo.getNum_cols()); - return info; - } - - /** - * Below method will be used to convert the blocklet index of thrift to - * wrapper - * - * @param blockletIndexThrift - * @return blocklet index wrapper - */ - private BlockletIndex getBlockletIndex( - org.apache.carbondata.format.BlockletIndex blockletIndexThrift) { - org.apache.carbondata.format.BlockletBTreeIndex btreeIndex = - blockletIndexThrift.getB_tree_index(); - org.apache.carbondata.format.BlockletMinMaxIndex minMaxIndex = - blockletIndexThrift.getMin_max_index(); - return new BlockletIndex( - new BlockletBTreeIndex(btreeIndex.getStart_key(), btreeIndex.getEnd_key()), - new BlockletMinMaxIndex(minMaxIndex.getMin_values(), minMaxIndex.getMax_values())); - } - - /** - * Below method will be used to convert the thrift compression meta to - * wrapper chunk compression meta - * - * @param chunkCompressionMetaThrift - * @return chunkCompressionMetaWrapper - */ - private ChunkCompressorMeta getChunkCompressionMeta( - org.apache.carbondata.format.ChunkCompressionMeta chunkCompressionMetaThrift) { - ChunkCompressorMeta compressorMeta = new ChunkCompressorMeta(); - compressorMeta - .setCompressor(getCompressionCodec(chunkCompressionMetaThrift.getCompression_codec())); - compressorMeta.setCompressedSize(chunkCompressionMetaThrift.getTotal_compressed_size()); - compressorMeta.setUncompressedSize(chunkCompressionMetaThrift.getTotal_uncompressed_size()); - return compressorMeta; - } - - /** - * Below method will be used to convert the thrift data type to wrapper data - * type - * - * @param dataTypeThrift - * @return dataType wrapper - */ - private DataType thriftDataTyopeToWrapperDataType( - org.apache.carbondata.format.DataType dataTypeThrift) { - switch (dataTypeThrift) { - case STRING: - return DataType.STRING; - case SHORT: - return DataType.SHORT; - case INT: - return DataType.INT; - case LONG: - return DataType.LONG; - case DOUBLE: - return DataType.DOUBLE; - case DECIMAL: - return DataType.DECIMAL; - case TIMESTAMP: - return DataType.TIMESTAMP; - case ARRAY: - return DataType.ARRAY; - case STRUCT: - return DataType.STRUCT; - default: - return DataType.STRING; - } - } - - /** - * Below method will be used to convert the thrift presence meta to wrapper - * presence meta - * - * @param presentMetadataThrift - * @return wrapper presence meta - */ - private PresenceMeta getPresenceMeta( - org.apache.carbondata.format.PresenceMeta presentMetadataThrift) { - PresenceMeta presenceMeta = new PresenceMeta(); - presenceMeta.setRepresentNullValues(presentMetadataThrift.isRepresents_presence()); - presenceMeta.setBitSet(BitSet.valueOf(presentMetadataThrift.getPresent_bit_stream())); - return presenceMeta; - } - - /** - * Below method will be used to convert the thrift object to wrapper object - * - * @param sortStateThrift - * @return wrapper sort state object - */ - private SortState getSortState(org.apache.carbondata.format.SortState sortStateThrift) { - if (sortStateThrift == org.apache.carbondata.format.SortState.SORT_EXPLICIT) { - return SortState.SORT_EXPLICT; - } else if (sortStateThrift == org.apache.carbondata.format.SortState.SORT_NATIVE) { - return SortState.SORT_NATIVE; - } else { - return SortState.SORT_NONE; - } - } - - /** - * Below method will be used to convert the thrift data chunk to wrapper - * data chunk - * - * @param datachunkThrift - * @return wrapper data chunk - */ - private DataChunk getDataChunk(org.apache.carbondata.format.DataChunk datachunkThrift, - boolean isPresenceMetaPresent) { - DataChunk dataChunk = new DataChunk(); - dataChunk.setColumnUniqueIdList(datachunkThrift.getColumn_ids()); - dataChunk.setDataPageLength(datachunkThrift.getData_page_length()); - dataChunk.setDataPageOffset(datachunkThrift.getData_page_offset()); - if (isPresenceMetaPresent) { - dataChunk.setNullValueIndexForColumn(getPresenceMeta(datachunkThrift.getPresence())); - } - dataChunk.setRlePageLength(datachunkThrift.getRle_page_length()); - dataChunk.setRlePageOffset(datachunkThrift.getRle_page_offset()); - dataChunk.setRowMajor(datachunkThrift.isRowMajor()); - dataChunk.setRowIdPageLength(datachunkThrift.getRowid_page_length()); - dataChunk.setRowIdPageOffset(datachunkThrift.getRowid_page_offset()); - dataChunk.setSortState(getSortState(datachunkThrift.getSort_state())); - dataChunk.setChunkCompressionMeta(getChunkCompressionMeta(datachunkThrift.getChunk_meta())); - List<Encoding> encodingList = new ArrayList<Encoding>(datachunkThrift.getEncoders().size()); - for (int i = 0; i < datachunkThrift.getEncoders().size(); i++) { - encodingList.add(fromExternalToWrapperEncoding(datachunkThrift.getEncoders().get(i))); - } - dataChunk.setEncoderList(encodingList); - if (encodingList.contains(Encoding.DELTA)) { - List<ByteBuffer> thriftEncoderMeta = datachunkThrift.getEncoder_meta(); - List<ValueEncoderMeta> encodeMetaList = - new ArrayList<ValueEncoderMeta>(thriftEncoderMeta.size()); - for (int i = 0; i < thriftEncoderMeta.size(); i++) { - encodeMetaList.add(deserializeEncoderMeta(thriftEncoderMeta.get(i).array())); - } - dataChunk.setValueEncoderMeta(encodeMetaList); - } - return dataChunk; - } - - /** - * Below method will be used to convert the encode metadata to - * ValueEncoderMeta object - * - * @param encoderMeta - * @return ValueEncoderMeta object - */ - private ValueEncoderMeta deserializeEncoderMeta(byte[] encoderMeta) { - // TODO : should remove the unnecessary fields. - ByteArrayInputStream aos = null; - ObjectInputStream objStream = null; - ValueEncoderMeta meta = null; - try { - aos = new ByteArrayInputStream(encoderMeta); - objStream = new ObjectInputStream(aos); - meta = (ValueEncoderMeta) objStream.readObject(); - } catch (ClassNotFoundException e) { - LOGGER.error(e); - } catch (IOException e) { - CarbonUtil.closeStreams(objStream); - } - return meta; - } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java new file mode 100644 index 0000000..d971756 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.carbon.metadata.blocklet.BlockletInfo; +import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.carbon.metadata.blocklet.index.BlockletIndex; +import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.reader.CarbonFooterReader; +import org.apache.carbondata.format.FileFooter; + +/** + * Below class will be used to convert the thrift object of data file + * meta data to wrapper object for version 2 data file + */ + +public class DataFileFooterConverter2 extends AbstractDataFileFooterConverter { + + /** + * Below method will be used to convert thrift file meta to wrapper file meta + */ + @Override public DataFileFooter readDataFileFooter(TableBlockInfo tableBlockInfo) + throws IOException { + DataFileFooter dataFileFooter = new DataFileFooter(); + CarbonFooterReader reader = + new CarbonFooterReader(tableBlockInfo.getFilePath(), tableBlockInfo.getBlockOffset()); + FileFooter footer = reader.readFooter(); + dataFileFooter.setVersionId((short) footer.getVersion()); + dataFileFooter.setNumberOfRows(footer.getNum_rows()); + dataFileFooter.setSegmentInfo(getSegmentInfo(footer.getSegment_info())); + List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>(); + List<org.apache.carbondata.format.ColumnSchema> table_columns = footer.getTable_columns(); + for (int i = 0; i < table_columns.size(); i++) { + columnSchemaList.add(thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i))); + } + dataFileFooter.setColumnInTable(columnSchemaList); + + List<org.apache.carbondata.format.BlockletIndex> leaf_node_indices_Thrift = + footer.getBlocklet_index_list(); + List<BlockletIndex> blockletIndexList = new ArrayList<BlockletIndex>(); + for (int i = 0; i < leaf_node_indices_Thrift.size(); i++) { + BlockletIndex blockletIndex = getBlockletIndex(leaf_node_indices_Thrift.get(i)); + blockletIndexList.add(blockletIndex); + } + List<org.apache.carbondata.format.BlockletInfo2> leaf_node_infos_Thrift = + footer.getBlocklet_info_list2(); + List<BlockletInfo> blockletInfoList = new ArrayList<BlockletInfo>(); + for (int i = 0; i < leaf_node_infos_Thrift.size(); i++) { + BlockletInfo blockletInfo = getBlockletInfo(leaf_node_infos_Thrift.get(i), + getNumberOfDimensionColumns(columnSchemaList)); + blockletInfo.setBlockletIndex(blockletIndexList.get(i)); + blockletInfoList.add(blockletInfo); + } + dataFileFooter.setBlockletList(blockletInfoList); + dataFileFooter.setBlockletIndex(getBlockletIndexForDataFileFooter(blockletIndexList)); + return dataFileFooter; + } + + /** + * Below method is to convert the blocklet info of the thrift to wrapper + * blocklet info + * + * @param blockletInfoThrift blocklet info of the thrift + * @return blocklet info wrapper + */ + private BlockletInfo getBlockletInfo( + org.apache.carbondata.format.BlockletInfo2 blockletInfoThrift, int numberOfDimensionColumns) { + BlockletInfo blockletInfo = new BlockletInfo(); + List<Long> dimensionColumnChunkOffsets = + blockletInfoThrift.getColumn_data_chunks_offsets().subList(0, numberOfDimensionColumns); + List<Long> measureColumnChunksOffsets = blockletInfoThrift.getColumn_data_chunks_offsets() + .subList(numberOfDimensionColumns, + blockletInfoThrift.getColumn_data_chunks_offsets().size()); + List<Short> dimensionColumnChunkLength = + blockletInfoThrift.getColumn_data_chunks_length().subList(0, numberOfDimensionColumns); + List<Short> measureColumnChunksLength = blockletInfoThrift.getColumn_data_chunks_length() + .subList(numberOfDimensionColumns, + blockletInfoThrift.getColumn_data_chunks_offsets().size()); + blockletInfo.setDimensionChunkOffsets(dimensionColumnChunkOffsets); + blockletInfo.setMeasureChunkOffsets(measureColumnChunksOffsets); + blockletInfo.setDimensionChunksLength(dimensionColumnChunkLength); + blockletInfo.setMeasureChunksLength(measureColumnChunksLength); + blockletInfo.setNumberOfRows(blockletInfoThrift.getNum_rows()); + return blockletInfo; + } + + /** + * Below method will be used to get the number of dimension column + * in carbon column schema + * + * @param columnSchemaList column schema list + * @return number of dimension column + */ + private int getNumberOfDimensionColumns(List<ColumnSchema> columnSchemaList) { + int numberOfDimensionColumns = 0; + int previousColumnGroupId = -1; + ColumnSchema columnSchema = null; + for (int i = 0; i < columnSchemaList.size(); i++) { + columnSchema = columnSchemaList.get(i); + if (columnSchema.isDimensionColumn() && columnSchema.isColumnar()) { + numberOfDimensionColumns++; + } else if (columnSchema.isDimensionColumn()) { + if (previousColumnGroupId != columnSchema.getColumnGroupId()) { + previousColumnGroupId = columnSchema.getColumnGroupId(); + numberOfDimensionColumns++; + } + } else { + break; + } + } + return numberOfDimensionColumns; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java new file mode 100644 index 0000000..a079ad7 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.core.util; + + +/** + * Factory class to get the thrift reader object based on version + */ +public class DataFileFooterConverterFactory { + + /** + * static instance + */ + private static final DataFileFooterConverterFactory FOOTER_CONVERTER_FACTORY = + new DataFileFooterConverterFactory(); + + /** + * private constructor + */ + private DataFileFooterConverterFactory() { + + } + + /** + * Below method will be used to get the instance of this class + * + * @return DataFileFooterConverterFactory instance + */ + public static DataFileFooterConverterFactory getInstance() { + return FOOTER_CONVERTER_FACTORY; + } + + /** + * Method will be used to get the file footer converter instance based on version + * + * @param versionNumber + * @return footer reader instance + */ + public AbstractDataFileFooterConverter getDataFileFooterConverter(final short versionNumber) { + switch (versionNumber) { + case 2: + return new DataFileFooterConverter2(); + default: + return new DataFileFooterConverter(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/writer/CarbonFooterWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonFooterWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonFooterWriter.java index 04d2b97..758c2d9 100644 --- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonFooterWriter.java +++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonFooterWriter.java @@ -28,9 +28,6 @@ import org.apache.carbondata.format.FileFooter; */ public class CarbonFooterWriter { - // It is version number of this format class. - private static int VERSION_NUMBER = 1; - // Fact file path private String filePath; @@ -48,7 +45,6 @@ public class CarbonFooterWriter { public void writeFooter(FileFooter footer, long currentPosition) throws IOException { ThriftWriter thriftWriter = openThriftWriter(filePath); - footer.setVersion(VERSION_NUMBER); try { thriftWriter.write(footer); thriftWriter.writeOffset(currentPosition); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java index 85e979a..8289c8b 100644 --- a/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java @@ -18,7 +18,12 @@ */ package org.apache.carbondata.scan.executor.impl; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import java.util.concurrent.Executors; import org.apache.carbondata.common.logging.LogService; @@ -33,7 +38,8 @@ import org.apache.carbondata.core.carbon.metadata.datatype.DataType; import org.apache.carbondata.core.carbon.metadata.encoder.Encoding; import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure; -import org.apache.carbondata.core.carbon.querystatistics.*; +import org.apache.carbondata.core.carbon.querystatistics.QueryStatistic; +import org.apache.carbondata.core.carbon.querystatistics.QueryStatisticsConstants; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastorage.store.impl.FileFactory; import org.apache.carbondata.core.keygenerator.KeyGenException; @@ -90,7 +96,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { // Initializing statistics list to record the query statistics // creating copy on write to handle concurrent scenario queryProperties.queryStatisticsRecorder = - CarbonTimeStatisticsFactory.createExecutorRecorder(queryModel.getQueryId()); + CarbonTimeStatisticsFactory.createExecutorRecorder(queryModel.getQueryId()); queryModel.setStatisticsRecorder(queryProperties.queryStatisticsRecorder); QueryUtil.resolveQueryModel(queryModel); QueryStatistic queryStatistic = new QueryStatistic(); @@ -143,9 +149,11 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { // and measure column start index queryProperties.aggExpressionStartIndex = queryModel.getQueryMeasures().size(); queryProperties.measureStartIndex = aggTypes.length - queryModel.getQueryMeasures().size(); + queryProperties.filterMeasures = new HashSet<>(); + queryProperties.complexFilterDimension = new HashSet<>(); + QueryUtil.getAllFilterDimensions(queryModel.getFilterExpressionResolverTree(), + queryProperties.complexFilterDimension, queryProperties.filterMeasures); - queryProperties.complexFilterDimension = - QueryUtil.getAllFilterDimensions(queryModel.getFilterExpressionResolverTree()); queryStatistic = new QueryStatistic(); // dictionary column unique column id to dictionary mapping // which will be used to get column actual data @@ -314,13 +322,38 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { List<CarbonMeasure> expressionMeasures = new ArrayList<CarbonMeasure>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); // setting all the dimension chunk indexes to be read from file - blockExecutionInfo.setAllSelectedDimensionBlocksIndexes(QueryUtil - .getDimensionsBlockIndexes(updatedQueryDimension, - segmentProperties.getDimensionOrdinalToBlockMapping(), expressionDimensions)); - // setting all the measure chunk indexes to be read from file - blockExecutionInfo.setAllSelectedMeasureBlocksIndexes(QueryUtil + int numberOfElementToConsider = 0; + int[] dimensionsBlockIndexes = QueryUtil.getDimensionsBlockIndexes(updatedQueryDimension, + segmentProperties.getDimensionOrdinalToBlockMapping(), expressionDimensions, + queryProperties.complexFilterDimension); + if (dimensionsBlockIndexes.length > 0) { + numberOfElementToConsider = dimensionsBlockIndexes[dimensionsBlockIndexes.length - 1] + == segmentProperties.getBlockTodimensionOrdinalMapping().size() - 1 ? + dimensionsBlockIndexes.length - 1 : + dimensionsBlockIndexes.length; + blockExecutionInfo.setAllSelectedDimensionBlocksIndexes(CarbonUtil + .getRangeIndex(dimensionsBlockIndexes, numberOfElementToConsider, + CarbonCommonConstants.NUMBER_OF_COLUMN_READ_IN_IO)); + } else { + blockExecutionInfo.setAllSelectedDimensionBlocksIndexes(new int[0][0]); + } + + int[] measureBlockIndexes = QueryUtil .getMeasureBlockIndexes(queryModel.getQueryMeasures(), expressionMeasures, - segmentProperties.getMeasuresOrdinalToBlockMapping())); + segmentProperties.getMeasuresOrdinalToBlockMapping(), queryProperties.filterMeasures); + if (measureBlockIndexes.length > 0) { + + numberOfElementToConsider = measureBlockIndexes[measureBlockIndexes.length - 1] + == segmentProperties.getMeasures().size() - 1 ? + measureBlockIndexes.length - 1 : + measureBlockIndexes.length; + // setting all the measure chunk indexes to be read from file + blockExecutionInfo.setAllSelectedMeasureBlocksIndexes(CarbonUtil + .getRangeIndex(measureBlockIndexes, numberOfElementToConsider, + CarbonCommonConstants.NUMBER_OF_COLUMN_READ_IN_IO)); + } else { + blockExecutionInfo.setAllSelectedMeasureBlocksIndexes(new int[0][0]); + } // setting the key structure info which will be required // to update the older block key with new key generator blockExecutionInfo.setKeyStructureInfo(queryProperties.keyStructureInfo);