Repository: carbondata Updated Branches: refs/heads/encoding_override 9e4da2a6c -> fecafde85
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java index fab1a39..0f1b52b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java @@ -24,17 +24,17 @@ import java.nio.channels.FileChannel; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastore.columnar.ColGroupBlockStorage; import org.apache.carbondata.core.datastore.columnar.IndexStorage; import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; -import org.apache.carbondata.core.datastore.page.encoding.EncodedData; +import org.apache.carbondata.core.datastore.page.EncodedTablePage; +import org.apache.carbondata.core.datastore.page.encoding.EncodedDimensionPage; +import org.apache.carbondata.core.datastore.page.key.TablePageKey; +import org.apache.carbondata.core.datastore.page.statistics.TablePageStatistics; import org.apache.carbondata.core.metadata.BlockletInfoColumnar; import org.apache.carbondata.core.util.CarbonMetadataUtil; import org.apache.carbondata.core.util.NodeHolder; import org.apache.carbondata.core.writer.CarbonFooterWriter; import org.apache.carbondata.format.FileFooter; -import org.apache.carbondata.processing.store.TablePageKey; -import org.apache.carbondata.processing.store.TablePageStatistics; import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter; import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo; @@ -47,12 +47,11 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> { super(dataWriterVo); } - @Override - public NodeHolder buildDataNodeHolder(EncodedData encoded, - TablePageStatistics stats, TablePageKey key) + protected NodeHolder buildNodeHolder(EncodedTablePage encodedTablePage) throws CarbonDataWriterException { // if there are no NO-Dictionary column present in the table then // set the empty byte array + TablePageKey key = encodedTablePage.getPageKey(); byte[] startKey = key.getStartKey(); byte[] endKey = key.getEndKey(); byte[] noDictionaryStartKey = key.getNoDictStartKey(); @@ -70,46 +69,48 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> { int totalKeySize = 0; int keyBlockSize = 0; - IndexStorage[] keyStorageArray = encoded.indexStorages; - boolean[] isSortedData = new boolean[keyStorageArray.length]; - int[] keyLengths = new int[keyStorageArray.length]; - byte[][] allMinValue = new byte[keyStorageArray.length][]; - byte[][] allMaxValue = new byte[keyStorageArray.length][]; - boolean[] colGrpBlock = new boolean[keyStorageArray.length]; - byte[][] keyBlockData = encoded.dimensions; - byte[][] measureArray = encoded.measures; + int numDimensions = encodedTablePage.getNumDimensions(); + boolean[] isSortedData = new boolean[numDimensions]; + int[] keyLengths = new int[numDimensions]; + int[] keyBlockIdxLengths = new int[numDimensions]; + byte[][] allMinValue = new byte[numDimensions][]; + byte[][] allMaxValue = new byte[numDimensions][]; + byte[][] keyBlockData = NodeHolder.getKeyArray(encodedTablePage); + byte[][] measureArray = NodeHolder.getDataArray(encodedTablePage); + TablePageStatistics stats = new TablePageStatistics(encodedTablePage.getDimensions(), + encodedTablePage.getMeasures()); - for (int i = 0; i < keyLengths.length; i++) { - keyLengths[i] = keyBlockData[i].length; - isSortedData[i] = keyStorageArray[i].isAlreadySorted(); + EncodedDimensionPage[] dimensions = encodedTablePage.getDimensions(); + for (int i = 0; i < dimensions.length; i++) { + IndexStorage indexStorage = dimensions[i].getIndexStorage(); + keyLengths[i] = dimensions[i].getEncodedData().length; + isSortedData[i] = indexStorage.isAlreadySorted(); if (!isSortedData[i]) { keyBlockSize++; } totalKeySize += keyLengths[i]; + byte[] min = stats.getDimensionMinValue()[i]; + byte[] max = stats.getDimensionMaxValue()[i]; if (dataWriterVo.getIsComplexType()[i] || dataWriterVo.getIsDictionaryColumn()[i]) { - allMinValue[i] = keyStorageArray[i].getMin(); - allMaxValue[i] = keyStorageArray[i].getMax(); + allMinValue[i] = min; + allMaxValue[i] = max; } else { - allMinValue[i] = updateMinMaxForNoDictionary(keyStorageArray[i].getMin()); - allMaxValue[i] = updateMinMaxForNoDictionary(keyStorageArray[i].getMax()); - } - //if keyStorageArray is instance of ColGroupBlockStorage than it's colGroup chunk - if (keyStorageArray[i] instanceof ColGroupBlockStorage) { - colGrpBlock[i] = true; + allMinValue[i] = updateMinMaxForNoDictionary(min); + allMaxValue[i] = updateMinMaxForNoDictionary(max); } } - int[] keyBlockIdxLengths = new int[keyBlockSize]; byte[][] dataAfterCompression = new byte[keyBlockSize][]; byte[][] indexMap = new byte[keyBlockSize][]; int idx = 0; - for (int i = 0; i < isSortedData.length; i++) { + for (int i = 0; i < dimensions.length; i++) { + IndexStorage indexStorage = dimensions[i].getIndexStorage(); if (!isSortedData[i]) { dataAfterCompression[idx] = - numberCompressor.compress((int[])keyStorageArray[i].getRowIdPage()); - if (null != keyStorageArray[i].getRowIdRlePage() - && ((int[])keyStorageArray[i].getRowIdRlePage()).length > 0) { - indexMap[idx] = numberCompressor.compress((int[])keyStorageArray[i].getRowIdRlePage()); + numberCompressor.compress((int[])indexStorage.getRowIdPage()); + if (null != indexStorage.getRowIdRlePage() + && ((int[])indexStorage.getRowIdRlePage()).length > 0) { + indexMap[idx] = numberCompressor.compress((int[])indexStorage.getRowIdRlePage()); } else { indexMap[idx] = new byte[0]; } @@ -128,10 +129,11 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> { int[] dataIndexMapLength = new int[compressDataBlockSize]; idx = 0; for (int i = 0; i < dataWriterVo.getRleEncodingForDictDim().length; i++) { + IndexStorage indexStorage = dimensions[i].getIndexStorage(); if (dataWriterVo.getRleEncodingForDictDim()[i]) { try { compressedDataIndex[idx] = - numberCompressor.compress((int[])keyStorageArray[i].getDataRlePage()); + numberCompressor.compress((int[])indexStorage.getDataRlePage()); dataIndexMapLength[idx] = compressedDataIndex[idx].length; idx++; } catch (Exception e) { @@ -154,7 +156,8 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> { holder.setMeasureNullValueIndex(stats.getNullBitSet()); // end key format will be <length of dictionary key><length of no // dictionary key><DictionaryKey><No Dictionary key> - byte[] updatedNoDictionaryEndKey = updateNoDictionaryStartAndEndKey(noDictionaryEndKey); + byte[] updatedNoDictionaryEndKey = + encodedTablePage.getPageKey().updateNoDictionaryStartAndEndKey(noDictionaryEndKey); ByteBuffer buffer = ByteBuffer.allocate( CarbonCommonConstants.INT_SIZE_IN_BYTE + CarbonCommonConstants.INT_SIZE_IN_BYTE + endKey.length + updatedNoDictionaryEndKey.length); @@ -165,7 +168,8 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> { buffer.rewind(); holder.setEndKey(buffer.array()); holder.setMeasureLenght(msrLength); - byte[] updatedNoDictionaryStartKey = updateNoDictionaryStartAndEndKey(noDictionaryStartKey); + byte[] updatedNoDictionaryStartKey = + encodedTablePage.getPageKey().updateNoDictionaryStartAndEndKey(noDictionaryStartKey); // start key format will be <length of dictionary key><length of no // dictionary key><DictionaryKey><No Dictionary key> buffer = ByteBuffer.allocate( @@ -185,38 +189,28 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> { holder.setCompressedIndexMap(indexMap); holder.setDataIndexMapLength(dataIndexMapLength); holder.setCompressedDataIndex(compressedDataIndex); - holder.setMeasureStats(stats.getMeasurePageStatistics()); holder.setTotalDimensionArrayLength(totalKeySize); holder.setTotalMeasureArrayLength(totalMsrArrySize); //setting column min max value holder.setDimensionColumnMaxData(allMaxValue); holder.setDimensionColumnMinData(allMinValue); holder.setRleEncodingForDictDim(dataWriterVo.getRleEncodingForDictDim()); - holder.setColGrpBlocks(colGrpBlock); + holder.setEncodedData(encodedTablePage); return holder; } - @Override public void writeBlockletData(NodeHolder holder) throws CarbonDataWriterException { - if (holder.getEntryCount() == 0) { + @Override public void writeTablePage(EncodedTablePage encodedTablePage) + throws CarbonDataWriterException { + if (encodedTablePage.getPageSize() == 0) { return; } - int indexBlockSize = 0; - for (int i = 0; i < holder.getKeyBlockIndexLength().length; i++) { - indexBlockSize += holder.getKeyBlockIndexLength()[i] + CarbonCommonConstants.INT_SIZE_IN_BYTE; - } - - for (int i = 0; i < holder.getDataIndexMapLength().length; i++) { - indexBlockSize += holder.getDataIndexMapLength()[i]; - } - - long blockletDataSize = - holder.getTotalDimensionArrayLength() + holder.getTotalMeasureArrayLength() - + indexBlockSize; + long blockletDataSize = encodedTablePage.getEncodedSize(); updateBlockletFileChannel(blockletDataSize); + NodeHolder nodeHolder = buildNodeHolder(encodedTablePage); // write data to file and get its offset - long offset = writeDataToFile(holder, fileChannel); + long offset = writeDataToFile(nodeHolder, fileChannel); // get the blocklet info for currently added blocklet - BlockletInfoColumnar blockletInfo = getBlockletInfo(holder, offset); + BlockletInfoColumnar blockletInfo = getBlockletInfo(nodeHolder, offset); // add blocklet info to list blockletInfoList.add(blockletInfo); LOGGER.info("A new blocklet is added, its data size is: " + blockletDataSize + " Byte"); @@ -231,6 +225,7 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> { */ private long writeDataToFile(NodeHolder nodeHolder, FileChannel channel) throws CarbonDataWriterException { + int numDimensions = nodeHolder.getKeyArray().length; // create byte buffer byte[][] compressedIndex = nodeHolder.getCompressedIndex(); byte[][] compressedIndexMap = nodeHolder.getCompressedIndexMap(); @@ -262,16 +257,17 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> { // add measure data array to byte buffer ByteBuffer buffer1 = null; - for (int i = 0; i < compressedIndex.length; i++) { - buffer1 = ByteBuffer.allocate(nodeHolder.getKeyBlockIndexLength()[i]); - buffer1.putInt(compressedIndex[i].length); - buffer1.put(compressedIndex[i]); - if (compressedIndexMap[i].length > 0) { - buffer1.put(compressedIndexMap[i]); + for (int i = 0; i < numDimensions; i++) { + if (nodeHolder.getKeyBlockIndexLength()[i] > 0) { + buffer1 = ByteBuffer.allocate(nodeHolder.getKeyBlockIndexLength()[i]); + buffer1.putInt(compressedIndex[i].length); + buffer1.put(compressedIndex[i]); + if (compressedIndexMap[i].length > 0) { + buffer1.put(compressedIndexMap[i]); + } + buffer1.rewind(); + byteBuffer.put(buffer1.array()); } - buffer1.rewind(); - byteBuffer.put(buffer1.array()); - } for (int i = 0; i < compressedDataIndex.length; i++) { byteBuffer.put(compressedDataIndex[i]); @@ -356,12 +352,7 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> { info.setStartKey(nodeHolder.getStartKey()); // set end key info.setEndKey(nodeHolder.getEndKey()); - info.setStats(nodeHolder.getStats()); - // return leaf metadata - - //colGroup Blocks - info.setColGrpBlocks(nodeHolder.getColGrpBlocks()); - + info.setEncodedTablePage(nodeHolder.getEncodedData()); return info; } @@ -374,7 +365,7 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> { long currentPosition = channel.size(); CarbonFooterWriter writer = new CarbonFooterWriter(filePath); FileFooter convertFileMeta = CarbonMetadataUtil - .convertFileFooter(blockletInfoList, localCardinality.length, localCardinality, + .convertFileFooter(blockletInfoList, localCardinality, thriftColumnSchemaList, dataWriterVo.getSegmentProperties()); fillBlockIndexInfoDetails(convertFileMeta.getNum_rows(), carbonDataFileName, currentPosition); writer.writeFooter(convertFileMeta, currentPosition); http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java index 82e83d5..65af57f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java @@ -26,6 +26,7 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; +import org.apache.carbondata.core.datastore.page.EncodedTablePage; import org.apache.carbondata.core.metadata.BlockletInfoColumnar; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.util.CarbonMetadataUtil; @@ -61,17 +62,19 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 { /** * Below method will be used to write the data to carbon data file * - * @param holder + * @param encodedTablePage * @throws CarbonDataWriterException any problem in writing operation */ - @Override public void writeBlockletData(NodeHolder holder) throws CarbonDataWriterException { - if (holder.getEntryCount() == 0) { + @Override public void writeTablePage(EncodedTablePage encodedTablePage) + throws CarbonDataWriterException { + NodeHolder nodeHolder = buildNodeHolder(encodedTablePage); + if (encodedTablePage.getPageSize() == 0) { return; } // size to calculate the size of the blocklet int size = 0; // get the blocklet info object - BlockletInfoColumnar blockletInfo = getBlockletInfo(holder, 0); + BlockletInfoColumnar blockletInfo = getBlockletInfo(encodedTablePage, 0); List<DataChunk2> datachunks = null; try { @@ -89,16 +92,16 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 { size += dataChunkByteArray[i].length; } // add row id index length - for (int i = 0; i < holder.getKeyBlockIndexLength().length; i++) { - size += holder.getKeyBlockIndexLength()[i]; + for (int i = 0; i < nodeHolder.getKeyBlockIndexLength().length; i++) { + size += nodeHolder.getKeyBlockIndexLength()[i]; } // add rle index length - for (int i = 0; i < holder.getDataIndexMapLength().length; i++) { - size += holder.getDataIndexMapLength()[i]; + for (int i = 0; i < nodeHolder.getDataIndexMapLength().length; i++) { + size += nodeHolder.getDataIndexMapLength()[i]; } // add dimension column data page and measure column data page size long blockletDataSize = - holder.getTotalDimensionArrayLength() + holder.getTotalMeasureArrayLength() + size; + nodeHolder.getTotalDimensionArrayLength() + nodeHolder.getTotalMeasureArrayLength() + size; // if size of the file already reached threshold size then create a new file and get the file // channel object updateBlockletFileChannel(blockletDataSize); @@ -117,7 +120,7 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 { throw new CarbonDataWriterException("Problem while getting the file channel size", e); } // write data to file and get its offset - writeDataToFile(holder, dataChunkByteArray, fileChannel); + writeDataToFile(nodeHolder, dataChunkByteArray, fileChannel); // add blocklet info to list blockletInfoList.add(blockletInfo); LOGGER.info("A new blocklet is added, its data size is: " + blockletDataSize + " Byte"); @@ -132,10 +135,6 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 { * <MColumn1DataChunk><MColumn1DataPage> * <MColumn2DataChunk><MColumn2DataPage> * <MColumn2DataChunk><MColumn2DataPage> - * - * @param nodeHolder - * @param dataChunksBytes - * @param channel * @throws CarbonDataWriterException */ private void writeDataToFile(NodeHolder nodeHolder, byte[][] dataChunksBytes, FileChannel channel) @@ -156,11 +155,15 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 { for (int i = 0; i < nodeHolder.getIsSortedKeyBlock().length; i++) { currentDataChunksOffset.add(offset); currentDataChunksLength.add((short) dataChunksBytes[i].length); - bufferSize += dataChunksBytes[i].length + nodeHolder.getKeyLengths()[i] + (!nodeHolder - .getIsSortedKeyBlock()[i] ? nodeHolder.getKeyBlockIndexLength()[rowIdIndex] : 0) + ( - dataWriterVo.getRleEncodingForDictDim()[i] ? - nodeHolder.getCompressedDataIndex()[rleIndex].length : - 0); + int size1 = (!nodeHolder.getIsSortedKeyBlock()[i] ? + nodeHolder.getKeyBlockIndexLength()[rowIdIndex] : + 0); + int size2 = (dataWriterVo.getRleEncodingForDictDim()[i] ? + nodeHolder.getCompressedDataIndex()[rleIndex].length : + 0); + bufferSize += dataChunksBytes[i].length + + nodeHolder.getKeyLengths()[i] + + size1 + size2; offset += dataChunksBytes[i].length; offset += nodeHolder.getKeyLengths()[i]; if (!nodeHolder.getIsSortedKeyBlock()[i]) { @@ -180,14 +183,16 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 { buffer.put(nodeHolder.getKeyArray()[i]); if (!nodeHolder.getIsSortedKeyBlock()[i]) { buffer.putInt(nodeHolder.getCompressedIndex()[rowIdIndex].length); - buffer.put(nodeHolder.getCompressedIndex()[rowIdIndex]); + byte[] b1 = nodeHolder.getCompressedIndex()[rowIdIndex]; + buffer.put(b1); if (nodeHolder.getCompressedIndexMap()[rowIdIndex].length > 0) { buffer.put(nodeHolder.getCompressedIndexMap()[rowIdIndex]); } rowIdIndex++; } if (dataWriterVo.getRleEncodingForDictDim()[i]) { - buffer.put(nodeHolder.getCompressedDataIndex()[rleIndex]); + byte[] b2 = nodeHolder.getCompressedDataIndex()[rleIndex]; + buffer.put(b2); rleIndex++; } } @@ -230,7 +235,9 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 { * * @return BlockletInfo - blocklet metadata */ - protected BlockletInfoColumnar getBlockletInfo(NodeHolder nodeHolder, long offset) { + protected BlockletInfoColumnar getBlockletInfo(EncodedTablePage encodedTablePage, long offset) { + NodeHolder nodeHolder = buildNodeHolder(encodedTablePage); + // create the info object for leaf entry BlockletInfoColumnar info = new BlockletInfoColumnar(); //add rleEncodingForDictDim array @@ -256,12 +263,7 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 { info.setStartKey(nodeHolder.getStartKey()); // set end key info.setEndKey(nodeHolder.getEndKey()); - info.setStats(nodeHolder.getStats()); - // return leaf metadata - - //colGroup Blocks - info.setColGrpBlocks(nodeHolder.getColGrpBlocks()); - + info.setEncodedTablePage(encodedTablePage); return info; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java index 9afbb55..adb97ae 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java @@ -26,9 +26,10 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; -import org.apache.carbondata.core.datastore.columnar.ColGroupBlockStorage; import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; -import org.apache.carbondata.core.datastore.page.encoding.EncodedData; +import org.apache.carbondata.core.datastore.page.EncodedTablePage; +import org.apache.carbondata.core.datastore.page.encoding.EncodedDimensionPage; +import org.apache.carbondata.core.datastore.page.encoding.EncodedMeasurePage; import org.apache.carbondata.core.metadata.blocklet.index.BlockletBTreeIndex; import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex; import org.apache.carbondata.core.metadata.index.BlockIndexInfo; @@ -36,11 +37,8 @@ import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.CarbonMetadataUtil; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.NodeHolder; import org.apache.carbondata.format.BlockletInfo3; import org.apache.carbondata.format.FileFooter3; -import org.apache.carbondata.processing.store.TablePageKey; -import org.apache.carbondata.processing.store.TablePageStatistics; import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter; import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo; @@ -77,181 +75,6 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]> dataWriterHolder = new DataWriterHolder(); } - /** - * Below method will be used to build the node holder object - * This node holder object will be used to persist data which will - * be written in carbon data file - */ - @Override public NodeHolder buildDataNodeHolder(EncodedData encoded, - TablePageStatistics stats, TablePageKey key) throws CarbonDataWriterException { - // if there are no NO-Dictionary column present in the table then - // set the empty byte array - byte[] startKey = key.getStartKey(); - byte[] endKey = key.getEndKey(); - byte[] noDictionaryStartKey = key.getNoDictStartKey(); - byte[] noDictionaryEndKey = key.getNoDictEndKey(); - if (null == noDictionaryEndKey) { - noDictionaryEndKey = new byte[0]; - } - if (null == noDictionaryStartKey) { - noDictionaryStartKey = new byte[0]; - } - // total measure length; - int totalMsrArrySize = 0; - // current measure length; - int currentMsrLenght = 0; - int numDimensions = encoded.dimensions.length; - int totalKeySize = 0; - boolean[] isSortedData = new boolean[numDimensions]; - int[] keyLengths = new int[numDimensions]; - boolean[] colGrpBlock = new boolean[numDimensions]; - int[] keyBlockIdxLengths = new int[numDimensions]; - byte[][] dataAfterCompression = new byte[numDimensions][]; - byte[][] indexMap = new byte[numDimensions][]; - for (int i = 0; i < numDimensions; i++) { - isSortedData[i] = encoded.indexStorages[i].isAlreadySorted(); - keyLengths[i] = encoded.dimensions[i].length; - totalKeySize += keyLengths[i]; - if (!isSortedData[i]) { - dataAfterCompression[i] = - getByteArray((short[])encoded.indexStorages[i].getRowIdPage()); - if (null != encoded.indexStorages[i].getRowIdRlePage() && - ((short[])encoded.indexStorages[i].getRowIdRlePage()).length > 0) { - indexMap[i] = getByteArray((short[])encoded.indexStorages[i].getRowIdRlePage()); - } else { - indexMap[i] = new byte[0]; - } - keyBlockIdxLengths[i] = (dataAfterCompression[i].length + indexMap[i].length) - + CarbonCommonConstants.INT_SIZE_IN_BYTE; - } - // if keyStorageArray is instance of ColGroupBlockStorage than it's colGroup chunk - if (encoded.indexStorages[i] instanceof ColGroupBlockStorage) { - colGrpBlock[i] = true; - } - } - byte[][] compressedDataIndex = new byte[numDimensions][]; - int[] dataIndexMapLength = new int[numDimensions]; - for (int i = 0; i < dataWriterVo.getRleEncodingForDictDim().length; i++) { - if (dataWriterVo.getRleEncodingForDictDim()[i]) { - try { - compressedDataIndex[i] = - getByteArray((short[])encoded.indexStorages[i].getDataRlePage()); - dataIndexMapLength[i] = compressedDataIndex[i].length; - } catch (Exception e) { - throw new CarbonDataWriterException(e.getMessage(), e); - } - } - } - int[] msrLength = new int[dataWriterVo.getMeasureCount()]; - // calculate the total size required for all the measure and get the - // each measure size - for (int i = 0; i < encoded.measures.length; i++) { - currentMsrLenght = encoded.measures[i].length; - totalMsrArrySize += currentMsrLenght; - msrLength[i] = currentMsrLenght; - } - NodeHolder holder = new NodeHolder(); - holder.setDataArray(encoded.measures); - holder.setKeyArray(encoded.dimensions); - holder.setMeasureNullValueIndex(stats.getNullBitSet()); - // end key format will be <length of dictionary key><length of no - // dictionary key><DictionaryKey><No Dictionary key> - byte[] updatedNoDictionaryEndKey = updateNoDictionaryStartAndEndKey(noDictionaryEndKey); - ByteBuffer buffer = ByteBuffer.allocate( - CarbonCommonConstants.INT_SIZE_IN_BYTE + CarbonCommonConstants.INT_SIZE_IN_BYTE - + endKey.length + updatedNoDictionaryEndKey.length); - buffer.putInt(endKey.length); - buffer.putInt(updatedNoDictionaryEndKey.length); - buffer.put(endKey); - buffer.put(updatedNoDictionaryEndKey); - buffer.rewind(); - holder.setEndKey(buffer.array()); - holder.setMeasureLenght(msrLength); - byte[] updatedNoDictionaryStartKey = updateNoDictionaryStartAndEndKey(noDictionaryStartKey); - // start key format will be <length of dictionary key><length of no - // dictionary key><DictionaryKey><No Dictionary key> - buffer = ByteBuffer.allocate( - CarbonCommonConstants.INT_SIZE_IN_BYTE + CarbonCommonConstants.INT_SIZE_IN_BYTE - + startKey.length + updatedNoDictionaryStartKey.length); - buffer.putInt(startKey.length); - buffer.putInt(updatedNoDictionaryStartKey.length); - buffer.put(startKey); - buffer.put(updatedNoDictionaryStartKey); - buffer.rewind(); - holder.setStartKey(buffer.array()); - holder.setEntryCount(key.getPageSize()); - holder.setKeyLengths(keyLengths); - holder.setKeyBlockIndexLength(keyBlockIdxLengths); - holder.setIsSortedKeyBlock(isSortedData); - holder.setCompressedIndex(dataAfterCompression); - holder.setCompressedIndexMap(indexMap); - holder.setDataIndexMapLength(dataIndexMapLength); - holder.setCompressedDataIndex(compressedDataIndex); - holder.setMeasureStats(stats.getMeasurePageStatistics()); - holder.setTotalDimensionArrayLength(totalKeySize); - holder.setTotalMeasureArrayLength(totalMsrArrySize); - holder.setMeasureColumnMaxData(stats.getMeasureMaxValue()); - holder.setMeasureColumnMinData(stats.getMeasureMinValue()); - holder.setDimensionColumnMaxData(stats.getDimensionMaxValue()); - holder.setDimensionColumnMinData(stats.getDimensionMinValue()); - holder.setRleEncodingForDictDim(dataWriterVo.getRleEncodingForDictDim()); - holder.setColGrpBlocks(colGrpBlock); - List<byte[]> dimensionDataChunk2 = null; - List<byte[]> measureDataChunk2 = null; - try { - dimensionDataChunk2 = CarbonMetadataUtil - .getDataChunk2(holder, thriftColumnSchemaList, dataWriterVo.getSegmentProperties(), true); - measureDataChunk2 = CarbonMetadataUtil - .getDataChunk2(holder, thriftColumnSchemaList, dataWriterVo.getSegmentProperties(), - false); - - } catch (IOException e) { - throw new CarbonDataWriterException(e.getMessage()); - } - holder.setHolderSize(calculateSize(holder, dimensionDataChunk2, measureDataChunk2)); - return holder; - } - - private int calculateSize(NodeHolder holder, List<byte[]> dimensionDataChunk2, - List<byte[]> measureDataChunk2) { - int size = 0; - // add row id index length - for (int i = 0; i < holder.getKeyBlockIndexLength().length; i++) { - if (!holder.getIsSortedKeyBlock()[i]) { - size += holder.getKeyBlockIndexLength()[i]; - } - } - // add rle index length - for (int i = 0; i < holder.getDataIndexMapLength().length; i++) { - if (holder.getRleEncodingForDictDim()[i]) { - size += holder.getDataIndexMapLength()[i]; - } - } - for (int i = 0; i < dimensionDataChunk2.size(); i++) { - size += dimensionDataChunk2.get(i).length; - } - for (int i = 0; i < measureDataChunk2.size(); i++) { - size += measureDataChunk2.get(i).length; - } - size += holder.getTotalDimensionArrayLength() + holder.getTotalMeasureArrayLength(); - return size; - } - - /** - * Below method will be used to convert short array to byte array - * - * @param data in short data - * @return byte array - */ - private byte[] getByteArray(short[] data) { - ByteBuffer buffer = ByteBuffer.allocate(data.length * 2); - for (int i = 0; i < data.length; i++) { - buffer.putShort(data[i]); - } - buffer.flip(); - return buffer.array(); - } - @Override protected void writeBlockletInfoToFile(FileChannel channel, String filePath) throws CarbonDataWriterException { try { @@ -277,65 +100,65 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]> } /** - * Below method will be used to write blocklet data to file + * Below method will be used to write one table page data */ - @Override public void writeBlockletData(NodeHolder holder) throws CarbonDataWriterException { - // check the number of pages present in data holder, if pages is exceeding threshold - // it will write the pages to file + @Override public void writeTablePage(EncodedTablePage encodedTablePage) + throws CarbonDataWriterException { // condition for writting all the pages - if (!holder.isWriteAll()) { + if (!encodedTablePage.isLastPage()) { boolean isAdded = false; - // check if size more than blocklet size then write the page - if (dataWriterHolder.getSize() + holder.getHolderSize() >= blockletSize) { + // check if size more than blocklet size then write the page to file + if (dataWriterHolder.getSize() + encodedTablePage.getEncodedSize() >= blockletSize) { // if one page size is more than blocklet size - if (dataWriterHolder.getNodeHolder().size() == 0) { + if (dataWriterHolder.getEncodedTablePages().size() == 0) { isAdded = true; - dataWriterHolder.addNodeHolder(holder); + dataWriterHolder.addPage(encodedTablePage); } LOGGER.info("Number of Pages for blocklet is: " + dataWriterHolder.getNumberOfPagesAdded() + " :Rows Added: " + dataWriterHolder.getTotalRows()); // write the data - writeDataToFile(fileChannel); + writeBlockletToFile(); } if (!isAdded) { - dataWriterHolder.addNodeHolder(holder); + dataWriterHolder.addPage(encodedTablePage); } } else { //for last blocklet check if the last page will exceed the blocklet size then write // existing pages and then last page - if (holder.getEntryCount() > 0) { - dataWriterHolder.addNodeHolder(holder); + if (encodedTablePage.getPageSize() > 0) { + dataWriterHolder.addPage(encodedTablePage); } if (dataWriterHolder.getNumberOfPagesAdded() > 0) { LOGGER.info("Number of Pages for blocklet is: " + dataWriterHolder.getNumberOfPagesAdded() + " :Rows Added: " + dataWriterHolder.getTotalRows()); - writeDataToFile(fileChannel); + writeBlockletToFile(); } } } - private void writeDataToFile(FileChannel channel) { - // get the list of node holder list - List<NodeHolder> nodeHolderList = dataWriterHolder.getNodeHolder(); + /** + * Write one blocklet data to file + */ + private void writeBlockletToFile() { + // get the list of all encoded table page + List<EncodedTablePage> encodedTablePageList = dataWriterHolder.getEncodedTablePages(); + int numDimensions = encodedTablePageList.get(0).getNumDimensions(); + int numMeasures = encodedTablePageList.get(0).getNumMeasures(); long blockletDataSize = 0; // get data chunks for all the column - byte[][] dataChunkBytes = - new byte[nodeHolderList.get(0).getKeyArray().length + nodeHolderList.get(0) - .getDataArray().length][]; - int measureStartIndex = nodeHolderList.get(0).getKeyArray().length; + byte[][] dataChunkBytes = new byte[numDimensions + numMeasures][]; + int measureStartIndex = numDimensions; // calculate the size of data chunks try { - for (int i = 0; i < nodeHolderList.get(0).getKeyArray().length; i++) { + for (int i = 0; i < numDimensions; i++) { dataChunkBytes[i] = CarbonUtil.getByteArray( - CarbonMetadataUtil.getDataChunk3(nodeHolderList, thriftColumnSchemaList, - dataWriterVo.getSegmentProperties(), i, true)); + CarbonMetadataUtil.getDimensionDataChunk3(encodedTablePageList, i)); blockletDataSize += dataChunkBytes[i].length; } - for (int i = 0; i < nodeHolderList.get(0).getDataArray().length; i++) { - dataChunkBytes[measureStartIndex] = CarbonUtil.getByteArray(CarbonMetadataUtil - .getDataChunk3(nodeHolderList, thriftColumnSchemaList, - dataWriterVo.getSegmentProperties(), i, false)); + for (int i = 0; i < numMeasures; i++) { + dataChunkBytes[measureStartIndex] = CarbonUtil.getByteArray( + CarbonMetadataUtil.getMeasureDataChunk3(encodedTablePageList, i)); blockletDataSize += dataChunkBytes[measureStartIndex].length; measureStartIndex++; } @@ -346,117 +169,96 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]> blockletDataSize += dataWriterHolder.getSize(); // to check if data size will exceed the block size then create a new file updateBlockletFileChannel(blockletDataSize); + // write data to file - writeDataToFile(fileChannel, dataChunkBytes); + try { + if (fileChannel.size() == 0) { + // write the header if file is empty + writeHeaderToFile(fileChannel); + } + writeBlockletToFile(fileChannel, dataChunkBytes); + } catch (IOException e) { + throw new CarbonDataWriterException("Problem when writing file", e); + } // clear the data holder dataWriterHolder.clear(); } /** - * Below method will be used to write data in carbon data file - * Data Format + * write file header + */ + private void writeHeaderToFile(FileChannel channel) throws IOException { + byte[] fileHeader = CarbonUtil.getByteArray( + CarbonMetadataUtil.getFileHeader( + true, thriftColumnSchemaList, dataWriterVo.getSchemaUpdatedTimeStamp())); + ByteBuffer buffer = ByteBuffer.wrap(fileHeader); + channel.write(buffer); + } + + /** + * Write one blocklet data into file + * File format: * <Column1 Data ChunkV3><Column1<Page1><Page2><Page3><Page4>> * <Column2 Data ChunkV3><Column2<Page1><Page2><Page3><Page4>> * <Column3 Data ChunkV3><Column3<Page1><Page2><Page3><Page4>> * <Column4 Data ChunkV3><Column4<Page1><Page2><Page3><Page4>> - * Each page will contain column data, Inverted index and rle index - * - * @param channel - * @param dataChunkBytes */ - private void writeDataToFile(FileChannel channel, byte[][] dataChunkBytes) { - long offset = 0; - // write the header - try { - if (fileChannel.size() == 0) { - // below code is to write the file header - byte[] fileHeader = CarbonUtil.getByteArray(CarbonMetadataUtil - .getFileHeader(true, thriftColumnSchemaList, dataWriterVo.getSchemaUpdatedTimeStamp())); - ByteBuffer buffer = ByteBuffer.wrap(fileHeader); - fileChannel.write(buffer); - } - offset = channel.size(); - } catch (IOException e) { - throw new CarbonDataWriterException("Problem while getting the file channel size"); - } + private void writeBlockletToFile(FileChannel channel, byte[][] dataChunkBytes) + throws IOException { + long offset = channel.size(); // to maintain the offset of each data chunk in blocklet List<Long> currentDataChunksOffset = new ArrayList<>(); // to maintain the length of each data chunk in blocklet List<Integer> currentDataChunksLength = new ArrayList<>(); - // get the node holder list - List<NodeHolder> nodeHolderList = dataWriterHolder.getNodeHolder(); - int numberOfDimension = nodeHolderList.get(0).getKeyArray().length; - int numberOfMeasures = nodeHolderList.get(0).getDataArray().length; - NodeHolder nodeHolder = null; + List<EncodedTablePage> encodedTablePages = dataWriterHolder.getEncodedTablePages(); + int numberOfDimension = encodedTablePages.get(0).getNumDimensions(); + int numberOfMeasures = encodedTablePages.get(0).getNumMeasures(); ByteBuffer buffer = null; - int bufferSize = 0; long dimensionOffset = 0; long measureOffset = 0; int numberOfRows = 0; - long totalSize = 0; // calculate the number of rows in each blocklet - for (int j = 0; j < nodeHolderList.size(); j++) { - numberOfRows += nodeHolderList.get(j).getEntryCount(); - totalSize += nodeHolderList.get(j).getHolderSize(); + for (EncodedTablePage encodedTablePage : encodedTablePages) { + numberOfRows += encodedTablePage.getPageSize(); } - try { - for (int i = 0; i < numberOfDimension; i++) { - currentDataChunksOffset.add(offset); - currentDataChunksLength.add(dataChunkBytes[i].length); - buffer = ByteBuffer.wrap(dataChunkBytes[i]); - fileChannel.write(buffer); - offset += dataChunkBytes[i].length; - for (int j = 0; j < nodeHolderList.size(); j++) { - nodeHolder = nodeHolderList.get(j); - bufferSize = nodeHolder.getKeyLengths()[i] + (!nodeHolder.getIsSortedKeyBlock()[i] ? - nodeHolder.getKeyBlockIndexLength()[i] : - 0) + (dataWriterVo.getRleEncodingForDictDim()[i] ? - nodeHolder.getCompressedDataIndex()[i].length : - 0); - buffer = ByteBuffer.allocate(bufferSize); - buffer.put(nodeHolder.getKeyArray()[i]); - if (!nodeHolder.getIsSortedKeyBlock()[i]) { - buffer.putInt(nodeHolder.getCompressedIndex()[i].length); - buffer.put(nodeHolder.getCompressedIndex()[i]); - if (nodeHolder.getCompressedIndexMap()[i].length > 0) { - buffer.put(nodeHolder.getCompressedIndexMap()[i]); - } - } - if (nodeHolder.getRleEncodingForDictDim()[i]) { - buffer.put(nodeHolder.getCompressedDataIndex()[i]); - } - buffer.flip(); - fileChannel.write(buffer); - offset += bufferSize; - } + for (int i = 0; i < numberOfDimension; i++) { + currentDataChunksOffset.add(offset); + currentDataChunksLength.add(dataChunkBytes[i].length); + buffer = ByteBuffer.wrap(dataChunkBytes[i]); + channel.write(buffer); + offset += dataChunkBytes[i].length; + for (EncodedTablePage encodedTablePage : encodedTablePages) { + EncodedDimensionPage dimension = encodedTablePage.getDimension(i); + int bufferSize = dimension.getSerializedSize(); + buffer = dimension.serialize(); + channel.write(buffer); + offset += bufferSize; } - dimensionOffset = offset; - int dataChunkStartIndex = nodeHolderList.get(0).getKeyArray().length; - for (int i = 0; i < numberOfMeasures; i++) { - nodeHolderList = dataWriterHolder.getNodeHolder(); - currentDataChunksOffset.add(offset); - currentDataChunksLength.add(dataChunkBytes[dataChunkStartIndex].length); - buffer = ByteBuffer.wrap(dataChunkBytes[dataChunkStartIndex]); - fileChannel.write(buffer); - offset += dataChunkBytes[dataChunkStartIndex].length; - dataChunkStartIndex++; - for (int j = 0; j < nodeHolderList.size(); j++) { - nodeHolder = nodeHolderList.get(j); - bufferSize = nodeHolder.getDataArray()[i].length; - buffer = ByteBuffer.wrap(nodeHolder.getDataArray()[i]); - fileChannel.write(buffer); - offset += bufferSize; - } + } + dimensionOffset = offset; + int dataChunkStartIndex = encodedTablePages.get(0).getNumDimensions(); + for (int i = 0; i < numberOfMeasures; i++) { + currentDataChunksOffset.add(offset); + currentDataChunksLength.add(dataChunkBytes[dataChunkStartIndex].length); + buffer = ByteBuffer.wrap(dataChunkBytes[dataChunkStartIndex]); + channel.write(buffer); + offset += dataChunkBytes[dataChunkStartIndex].length; + dataChunkStartIndex++; + for (EncodedTablePage encodedTablePage : encodedTablePages) { + EncodedMeasurePage measure = encodedTablePage.getMeasure(i); + int bufferSize = measure.getSerializedSize(); + buffer = measure.serialize(); + channel.write(buffer); + offset += bufferSize; } - measureOffset = offset; - } catch (IOException e) { - throw new CarbonDataWriterException("Problem while writing the data", e); } - blockletIndex.add(CarbonMetadataUtil - .getBlockletIndex(nodeHolderList, dataWriterVo.getSegmentProperties().getMeasures())); + measureOffset = offset; + blockletIndex.add( + CarbonMetadataUtil.getBlockletIndex( + encodedTablePages, dataWriterVo.getSegmentProperties().getMeasures())); BlockletInfo3 blockletInfo3 = new BlockletInfo3(numberOfRows, currentDataChunksOffset, currentDataChunksLength, - dimensionOffset, measureOffset, dataWriterHolder.getNodeHolder().size()); + dimensionOffset, measureOffset, dataWriterHolder.getEncodedTablePages().size()); blockletMetadata.add(blockletInfo3); } @@ -538,7 +340,7 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]> closeExecutorService(); } - @Override public void writeBlockletInfoToFile() throws CarbonDataWriterException { + @Override public void writeFooterToFile() throws CarbonDataWriterException { if (this.blockletMetadata.size() > 0) { writeBlockletInfoToFile(fileChannel, carbonDataFileTempPath); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java index a98f388..246fa86 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java @@ -19,24 +19,24 @@ package org.apache.carbondata.processing.store.writer.v3; import java.util.ArrayList; import java.util.List; -import org.apache.carbondata.core.util.NodeHolder; +import org.apache.carbondata.core.datastore.page.EncodedTablePage; public class DataWriterHolder { - private List<NodeHolder> nodeHolder; + private List<EncodedTablePage> encodedTablePage; private long currentSize; public DataWriterHolder() { - this.nodeHolder = new ArrayList<NodeHolder>(); + this.encodedTablePage = new ArrayList<EncodedTablePage>(); } public void clear() { - nodeHolder.clear(); + encodedTablePage.clear(); currentSize = 0; } - public void addNodeHolder(NodeHolder holder) { - this.nodeHolder.add(holder); - currentSize += holder.getHolderSize(); + public void addPage(EncodedTablePage encodedTablePage) { + this.encodedTablePage.add(encodedTablePage); + currentSize += encodedTablePage.getEncodedSize(); } public long getSize() { @@ -45,18 +45,18 @@ public class DataWriterHolder { } public int getNumberOfPagesAdded() { - return nodeHolder.size(); + return encodedTablePage.size(); } public int getTotalRows() { int rows = 0; - for (NodeHolder nh : nodeHolder) { - rows += nh.getEntryCount(); + for (EncodedTablePage nh : encodedTablePage) { + rows += nh.getPageSize(); } return rows; } - public List<NodeHolder> getNodeHolder() { - return nodeHolder; + public List<EncodedTablePage> getEncodedTablePages() { + return encodedTablePage; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/processing/src/main/java/org/apache/carbondata/processing/util/NonDictionaryUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/NonDictionaryUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/NonDictionaryUtil.java deleted file mode 100644 index c634e7c..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/util/NonDictionaryUtil.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.processing.util; - -import java.nio.ByteBuffer; - -import org.apache.carbondata.core.datastore.row.WriteStepRowUtil; - -/** - * This is the utility class for No Dictionary changes. - */ -public class NonDictionaryUtil { - - /** - * This method will form one single byte [] for all the high card dims. - * For example if you need to pack 2 columns c1 and c2 , it stores in following way - * <total_len(short)><offsetLen(short)><offsetLen+c1_len(short)><c1(byte[])><c2(byte[])> - * @param byteBufferArr - * @return - */ - public static byte[] packByteBufferIntoSingleByteArray(byte[][] byteBufferArr) { - // for empty array means there is no data to remove dictionary. - if (null == byteBufferArr || byteBufferArr.length == 0) { - return null; - } - int noOfCol = byteBufferArr.length; - short toDetermineLengthOfByteArr = 2; - short offsetLen = (short) (noOfCol * 2 + toDetermineLengthOfByteArr); - int totalBytes = calculateTotalBytes(byteBufferArr) + offsetLen; - - ByteBuffer buffer = ByteBuffer.allocate(totalBytes); - - // write the length of the byte [] as first short - buffer.putShort((short) (totalBytes - toDetermineLengthOfByteArr)); - // writing the offset of the first element. - buffer.putShort(offsetLen); - - // prepare index for byte [] - for (int index = 0; index < byteBufferArr.length - 1; index++) { - int noOfBytes = byteBufferArr[index].length; - - buffer.putShort((short) (offsetLen + noOfBytes)); - offsetLen += noOfBytes; - } - - // put actual data. - for (int index = 0; index < byteBufferArr.length; index++) { - buffer.put(byteBufferArr[index]); - } - buffer.rewind(); - return buffer.array(); - - } - - /** - * To calculate the total bytes in byte Buffer[]. - * - * @param byteBufferArr - * @return - */ - private static int calculateTotalBytes(byte[][] byteBufferArr) { - int total = 0; - for (int index = 0; index < byteBufferArr.length; index++) { - total += byteBufferArr[index].length; - } - return total; - } - - /** - * Method to get the required Dimension from obj [] - * - * @param index - * @param row - * @return - */ - public static Integer getDimension(int index, Object[] row) { - - Integer[] dimensions = (Integer[]) row[WriteStepRowUtil.DICTIONARY_DIMENSION]; - - return dimensions[index]; - - } - - /** - * Method to get the required measure from obj [] - * - * @param index - * @param row - * @return - */ - public static Object getMeasure(int index, Object[] row) { - Object[] measures = (Object[]) row[WriteStepRowUtil.MEASURE]; - return measures[index]; - } - - public static byte[] getByteArrayForNoDictionaryCols(Object[] row) { - - return (byte[]) row[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX]; - } - - public static void prepareOutObj(Object[] out, int[] dimArray, byte[][] byteBufferArr, - Object[] measureArray) { - - out[WriteStepRowUtil.DICTIONARY_DIMENSION] = dimArray; - out[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX] = byteBufferArr; - out[WriteStepRowUtil.MEASURE] = measureArray; - - } - - /** - * This method will extract the single dimension from the complete high card dims byte[].+ * - * The format of the byte [] will be, Totallength,CompleteStartOffsets,Dat - * - * @param highCardArr - * @param index - * @param highCardinalityCount - * @param outBuffer - */ - public static void extractSingleHighCardDims(byte[] highCardArr, int index, - int highCardinalityCount, ByteBuffer outBuffer) { - ByteBuffer buff = null; - short secIndex = 0; - short firstIndex = 0; - int length; - // if the requested index is a last one then we need to calculate length - // based on byte[] length. - if (index == highCardinalityCount - 1) { - // need to read 2 bytes(1 short) to determine starting offset and - // length can be calculated by array length. - buff = ByteBuffer.wrap(highCardArr, (index * 2) + 2, 2); - } else { - // need to read 4 bytes(2 short) to determine starting offset and - // length. - buff = ByteBuffer.wrap(highCardArr, (index * 2) + 2, 4); - } - - firstIndex = buff.getShort(); - // if it is a last dimension in high card then this will be last - // offset.so calculate length from total length - if (index == highCardinalityCount - 1) { - secIndex = (short) highCardArr.length; - } else { - secIndex = buff.getShort(); - } - - length = secIndex - firstIndex; - - outBuffer.position(firstIndex); - outBuffer.limit(outBuffer.position() + length); - - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java ---------------------------------------------------------------------- diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java index 91cc195..cfba78b 100644 --- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java +++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java @@ -165,7 +165,7 @@ public class StoreCreator { loadModel.setFactTimeStamp(System.currentTimeMillis()); loadModel.setMaxColumns("10"); - executeGraph(loadModel, absoluteTableIdentifier.getStorePath()); + loadData(loadModel, absoluteTableIdentifier.getStorePath()); } catch (Exception e) { e.printStackTrace(); @@ -355,7 +355,7 @@ public class StoreCreator { * @param storeLocation * @throws Exception */ - public static void executeGraph(CarbonLoadModel loadModel, String storeLocation) + public static void loadData(CarbonLoadModel loadModel, String storeLocation) throws Exception { new File(storeLocation).mkdirs(); String outPutLoc = storeLocation + "/etl";
