[CARBONDATA-2039] Add relative blocklet id during initialization in the blocklet data map
Problem Currently while loading the blocklets in data map all the blocklets are stored in the unsafe manager on array indexes for all the blocklets in one segment. So lets say if 1 segment has 7 task and each task has 10 part files with 3 blocklets each, total number of blocklets in the segment would be 210. Unsafe store will store all these blocklets in one array with start index as 0 and end index as 210. Due to this while filling the blocklet information the blocklet id taken is the array index which can be any number from 0 to 210. This is leading to loss in actual mapping of blocklet with respect to carbondata file. Solution Add the relative blocklet id during loading of blocklets in the unsafe store (relative blocklet id is the id of blocklet in the carbondata file) This closes #1796 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5ea538fe Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5ea538fe Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5ea538fe Branch: refs/heads/carbonstore Commit: 5ea538fe523749aa7f22629cd17a36c7a3e73b90 Parents: d1d726a Author: manishgupta88 <[email protected]> Authored: Fri Jan 12 18:28:56 2018 +0530 Committer: ravipesala <[email protected]> Committed: Thu Jan 18 22:36:00 2018 +0530 ---------------------------------------------------------------------- .../blockletindex/BlockletDataMap.java | 39 ++++++++++++++++---- 1 file changed, 32 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ea538fe/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java index e1fa686..7b2c016 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java @@ -101,6 +101,8 @@ public class BlockletDataMap implements DataMap, Cacheable { private static int LOCATIONS = 10; + private static int BLOCKLET_ID_INDEX = 11; + private static int TASK_MIN_VALUES_INDEX = 0; private static int TASK_MAX_VALUES_INDEX = 1; @@ -130,6 +132,10 @@ public class BlockletDataMap implements DataMap, Cacheable { isPartitionedSegment = blockletDataMapInfo.isPartitionedSegment(); DataMapRowImpl summaryRow = null; byte[] schemaBinary = null; + // below 2 variables will be used for fetching the relative blocklet id. Relative blocklet ID + // is id assigned to a blocklet within a part file + String tempFilePath = null; + int relativeBlockletId = 0; for (DataFileFooter fileFooter : indexInfo) { if (segmentProperties == null) { List<ColumnSchema> columnInTable = fileFooter.getColumnInTable(); @@ -153,9 +159,17 @@ public class BlockletDataMap implements DataMap, Cacheable { loadToUnsafeBlock(fileFooter, segmentProperties, blockInfo.getFilePath(), summaryRow, locations); } else { + // blocklet ID will start from 0 again only when part file path is changed + if (null == tempFilePath || !tempFilePath.equals(blockInfo.getFilePath())) { + tempFilePath = blockInfo.getFilePath(); + relativeBlockletId = 0; + } summaryRow = loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath(), summaryRow, - locations); + locations, relativeBlockletId); + // this is done because relative blocklet id need to be incremented based on the + // total number of blocklets + relativeBlockletId += fileFooter.getBlockletList().size(); } } } @@ -176,7 +190,7 @@ public class BlockletDataMap implements DataMap, Cacheable { private DataMapRowImpl loadToUnsafe(DataFileFooter fileFooter, SegmentProperties segmentProperties, String filePath, DataMapRowImpl summaryRow, - String[] locations) { + String[] locations, int relativeBlockletId) { int[] minMaxLen = segmentProperties.getColumnsValueSize(); List<BlockletInfo> blockletList = fileFooter.getBlockletList(); CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema(); @@ -236,7 +250,9 @@ public class BlockletDataMap implements DataMap, Cacheable { // Add block footer offset, it is used if we need to read footer of block row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), ordinal++); setLocations(locations, row, ordinal); - + ordinal++; + // for relative blockelt id i.e blocklet id that belongs to a particular part file + row.setShort((short) relativeBlockletId++, ordinal); unsafeMemoryDMStore.addIndexRowToUnsafe(row); } catch (Exception e) { throw new RuntimeException(e); @@ -312,6 +328,11 @@ public class BlockletDataMap implements DataMap, Cacheable { row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), ordinal++); try { setLocations(locations, row, ordinal); + ordinal++; + // for relative blocklet id. Value is -1 because in case of old store blocklet info will + // not be present in the index file and in that case we will not knwo the total number of + // blocklets + row.setShort((short) -1, ordinal); unsafeMemoryDMStore.addIndexRowToUnsafe(row); } catch (Exception e) { throw new RuntimeException(e); @@ -511,6 +532,9 @@ public class BlockletDataMap implements DataMap, Cacheable { // for locations indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY)); + // for relative blocklet id i.e. blocklet id that belongs to a particular part file. + indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT)); + unsafeMemoryDMStore = new UnsafeMemoryDMStore(indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()])); } @@ -628,7 +652,7 @@ public class BlockletDataMap implements DataMap, Cacheable { int rowCount = unsafeMemoryDMStore.getRowCount(); for (int i = 0; i < rowCount; i++) { DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(i).convertToSafeRow(); - blocklets.add(createBlocklet(safeRow, i)); + blocklets.add(createBlocklet(safeRow, safeRow.getShort(BLOCKLET_ID_INDEX))); } } else { int startIndex = findStartIndex(convertToRow(searchStartKey), comparator); @@ -637,13 +661,14 @@ public class BlockletDataMap implements DataMap, Cacheable { FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null); while (startIndex <= endIndex) { DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(startIndex).convertToSafeRow(); + int blockletId = safeRow.getShort(BLOCKLET_ID_INDEX); String filePath = new String(safeRow.getByteArray(FILE_PATH_INDEX), CarbonCommonConstants.DEFAULT_CHARSET_CLASS); boolean isValid = addBlockBasedOnMinMaxValue(filterExecuter, getMinMaxValue(safeRow, MAX_VALUES_INDEX), - getMinMaxValue(safeRow, MIN_VALUES_INDEX), filePath, startIndex); + getMinMaxValue(safeRow, MIN_VALUES_INDEX), filePath, blockletId); if (isValid) { - blocklets.add(createBlocklet(safeRow, startIndex)); + blocklets.add(createBlocklet(safeRow, blockletId)); } startIndex++; } @@ -712,7 +737,7 @@ public class BlockletDataMap implements DataMap, Cacheable { public ExtendedBlocklet getDetailedBlocklet(String blockletId) { int index = Integer.parseInt(blockletId); DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(index).convertToSafeRow(); - return createBlocklet(safeRow, index); + return createBlocklet(safeRow, safeRow.getShort(BLOCKLET_ID_INDEX)); } private byte[][] getMinMaxValue(DataMapRow row, int index) {
