Problem: For old store the measure min and max values are written opposite (i.e min in place of max and max in place of min). Due to this computing of measure filter with current code is impacted. This problem specifically comes when measure data has negative values.
Impact Filter query on measure Solution In order to sync with current min and max values for old store, measures min and max value is reversed by using an old store flag. This closes #1879 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/1248bd4b Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/1248bd4b Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/1248bd4b Branch: refs/heads/branch-1.3 Commit: 1248bd4b7ff4bb45392106082011dde7f9db460f Parents: ee1c4d4 Author: manishgupta88 <[email protected]> Authored: Tue Jan 30 08:56:13 2018 +0530 Committer: ravipesala <[email protected]> Committed: Thu Feb 1 22:13:36 2018 +0530 ---------------------------------------------------------------------- .../core/datastore/block/TableBlockInfo.java | 14 +++++ .../blockletindex/BlockletDMComparator.java | 2 +- .../blockletindex/BlockletDataMap.java | 61 +++++--------------- .../BlockletDataRefNodeWrapper.java | 39 ++++++++++++- .../executor/impl/AbstractQueryExecutor.java | 11 ++++ .../core/scan/filter/ColumnFilterInfo.java | 9 ++- .../apache/carbondata/core/util/CarbonUtil.java | 47 +++++++++++++++ .../carbondata/core/util/CarbonUtilTest.java | 46 +++++++++++++++ 8 files changed, 178 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/1248bd4b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java index c3cc551..b27b5fc 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java @@ -72,6 +72,12 @@ public class TableBlockInfo implements Distributable, Serializable { private String[] locations; private ColumnarFormatVersion version; + + /** + * flag to determine whether the data block is from old store (version 1.1) + * or current store + */ + private boolean isDataBlockFromOldStore; /** * The class holds the blockletsinfo */ @@ -410,4 +416,12 @@ public class TableBlockInfo implements Distributable, Serializable { public void setBlockletId(String blockletId) { this.blockletId = blockletId; } + + public boolean isDataBlockFromOldStore() { + return isDataBlockFromOldStore; + } + + public void setDataBlockFromOldStore(boolean dataBlockFromOldStore) { + isDataBlockFromOldStore = dataBlockFromOldStore; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1248bd4b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java index fccbda8..9a50600 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java @@ -63,7 +63,7 @@ public class BlockletDMComparator implements Comparator<DataMapRow> { int compareResult = 0; int processedNoDictionaryColumn = numberOfNoDictSortColumns; byte[][] firstBytes = splitKey(first.getByteArray(0)); - byte[][] secondBytes = splitKey(second.getByteArray(0)); + byte[][] secondBytes = splitKey(first.getByteArray(0)); byte[] firstNoDictionaryKeys = firstBytes[1]; ByteBuffer firstNoDictionaryKeyBuffer = ByteBuffer.wrap(firstNoDictionaryKeys); byte[] secondNoDictionaryKeys = secondBytes[1]; http://git-wip-us.apache.org/repos/asf/carbondata/blob/1248bd4b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java index b097c66..699f9e1 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java @@ -26,7 +26,6 @@ import java.io.UnsupportedEncodingException; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.BitSet; import java.util.Comparator; import java.util.List; @@ -48,7 +47,6 @@ import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore; import org.apache.carbondata.core.indexstore.row.DataMapRow; import org.apache.carbondata.core.indexstore.row.DataMapRowImpl; import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema; -import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; @@ -64,6 +62,7 @@ import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; import org.apache.carbondata.core.util.ByteUtil; +import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataFileFooterConverter; import org.apache.carbondata.core.util.DataTypeUtil; @@ -298,18 +297,23 @@ public class BlockletDataMap implements DataMap, Cacheable { BlockletMinMaxIndex minMaxIndex = blockletIndex.getMinMaxIndex(); byte[][] minValues = updateMinValues(minMaxIndex.getMinValues(), minMaxLen); - row.setRow(addMinMax(minMaxLen, schema[ordinal], minValues), ordinal); + byte[][] maxValues = updateMaxValues(minMaxIndex.getMaxValues(), minMaxLen); + // update min max values in case of old store + byte[][] updatedMinValues = + CarbonUtil.updateMinMaxValues(fileFooter, maxValues, minValues, true); + byte[][] updatedMaxValues = + CarbonUtil.updateMinMaxValues(fileFooter, maxValues, minValues, false); + row.setRow(addMinMax(minMaxLen, schema[ordinal], updatedMinValues), ordinal); // compute and set task level min values addTaskMinMaxValues(summaryRow, minMaxLen, - unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], minValues, + unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMinValues, TASK_MIN_VALUES_INDEX, true); ordinal++; taskMinMaxOrdinal++; - byte[][] maxValues = updateMaxValues(minMaxIndex.getMaxValues(), minMaxLen); - row.setRow(addMinMax(minMaxLen, schema[ordinal], maxValues), ordinal); + row.setRow(addMinMax(minMaxLen, schema[ordinal], updatedMaxValues), ordinal); // compute and set task level max values addTaskMinMaxValues(summaryRow, minMaxLen, - unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], maxValues, + unsafeMemorySummaryDMStore.getSchema()[taskMinMaxOrdinal], updatedMaxValues, TASK_MAX_VALUES_INDEX, false); ordinal++; @@ -624,42 +628,7 @@ public class BlockletDataMap implements DataMap, Cacheable { if (unsafeMemoryDMStore.getRowCount() == 0) { return new ArrayList<>(); } - // getting the start and end index key based on filter for hitting the - // selected block reference nodes based on filter resolver tree. - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("preparing the start and end key for finding" - + "start and end block as per filter resolver"); - } List<Blocklet> blocklets = new ArrayList<>(); - Comparator<DataMapRow> comparator = - new BlockletDMComparator(segmentProperties.getColumnsValueSize(), - segmentProperties.getNumberOfSortColumns(), - segmentProperties.getNumberOfNoDictSortColumns()); - List<IndexKey> listOfStartEndKeys = new ArrayList<IndexKey>(2); - FilterUtil - .traverseResolverTreeAndGetStartAndEndKey(segmentProperties, filterExp, listOfStartEndKeys); - // reading the first value from list which has start key - IndexKey searchStartKey = listOfStartEndKeys.get(0); - // reading the last value from list which has end key - IndexKey searchEndKey = listOfStartEndKeys.get(1); - if (null == searchStartKey && null == searchEndKey) { - try { - // TODO need to handle for no dictionary dimensions - searchStartKey = FilterUtil.prepareDefaultStartIndexKey(segmentProperties); - // TODO need to handle for no dictionary dimensions - searchEndKey = FilterUtil.prepareDefaultEndIndexKey(segmentProperties); - } catch (KeyGenException e) { - return null; - } - } - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "Successfully retrieved the start and end key" + "Dictionary Start Key: " + Arrays - .toString(searchStartKey.getDictionaryKeys()) + "No Dictionary Start Key " + Arrays - .toString(searchStartKey.getNoDictionaryKeys()) + "Dictionary End Key: " + Arrays - .toString(searchEndKey.getDictionaryKeys()) + "No Dictionary End Key " + Arrays - .toString(searchEndKey.getNoDictionaryKeys())); - } if (filterExp == null) { int rowCount = unsafeMemoryDMStore.getRowCount(); for (int i = 0; i < rowCount; i++) { @@ -667,11 +636,13 @@ public class BlockletDataMap implements DataMap, Cacheable { blocklets.add(createBlocklet(safeRow, safeRow.getShort(BLOCKLET_ID_INDEX))); } } else { - int startIndex = findStartIndex(convertToRow(searchStartKey), comparator); - int endIndex = findEndIndex(convertToRow(searchEndKey), comparator); + // Remove B-tree jump logic as start and end key prepared is not + // correct for old store scenarios + int startIndex = 0; + int endIndex = unsafeMemoryDMStore.getRowCount(); FilterExecuter filterExecuter = FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null); - while (startIndex <= endIndex) { + while (startIndex < endIndex) { DataMapRow safeRow = unsafeMemoryDMStore.getUnsafeRow(startIndex).convertToSafeRow(); int blockletId = safeRow.getShort(BLOCKLET_ID_INDEX); String filePath = new String(safeRow.getByteArray(FILE_PATH_INDEX), http://git-wip-us.apache.org/repos/asf/carbondata/blob/1248bd4b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java index 097dd8c..b672c58 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java @@ -132,13 +132,48 @@ public class BlockletDataRefNodeWrapper implements DataRefNode { public MeasureRawColumnChunk[] getMeasureChunks(FileHolder fileReader, int[][] blockIndexes) throws IOException { MeasureColumnChunkReader measureColumnChunkReader = getMeasureColumnChunkReader(fileReader); - return measureColumnChunkReader.readRawMeasureChunks(fileReader, blockIndexes); + MeasureRawColumnChunk[] measureRawColumnChunks = + measureColumnChunkReader.readRawMeasureChunks(fileReader, blockIndexes); + updateMeasureRawColumnChunkMinMaxValues(measureRawColumnChunks); + return measureRawColumnChunks; } @Override public MeasureRawColumnChunk getMeasureChunk(FileHolder fileReader, int blockIndex) throws IOException { MeasureColumnChunkReader measureColumnChunkReader = getMeasureColumnChunkReader(fileReader); - return measureColumnChunkReader.readRawMeasureChunk(fileReader, blockIndex); + MeasureRawColumnChunk measureRawColumnChunk = + measureColumnChunkReader.readRawMeasureChunk(fileReader, blockIndex); + updateMeasureRawColumnChunkMinMaxValues(measureRawColumnChunk); + return measureRawColumnChunk; + } + + /** + * This method is written specifically for old store wherein the measure min and max values + * are written opposite (i.e min in place of max and amx in place of min). Due to this computing + * f measure filter with current code is impacted. In order to sync with current min and + * max values only in case old store and measures is reversed + * + * @param measureRawColumnChunk + */ + private void updateMeasureRawColumnChunkMinMaxValues( + MeasureRawColumnChunk measureRawColumnChunk) { + if (blockInfos.get(index).isDataBlockFromOldStore()) { + byte[][] maxValues = measureRawColumnChunk.getMaxValues(); + byte[][] minValues = measureRawColumnChunk.getMinValues(); + measureRawColumnChunk.setMaxValues(minValues); + measureRawColumnChunk.setMinValues(maxValues); + } + } + + private void updateMeasureRawColumnChunkMinMaxValues( + MeasureRawColumnChunk[] measureRawColumnChunks) { + if (blockInfos.get(index).isDataBlockFromOldStore()) { + for (int i = 0; i < measureRawColumnChunks.length; i++) { + if (null != measureRawColumnChunks[i]) { + updateMeasureRawColumnChunkMinMaxValues(measureRawColumnChunks[i]); + } + } + } } private DimensionColumnChunkReader getDimensionColumnChunkReader(FileHolder fileReader) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/1248bd4b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java index c33d5ac..6875f35 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java @@ -225,9 +225,20 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { TableBlockInfo info = blockInfo.copy(); BlockletDetailInfo detailInfo = info.getDetailInfo(); detailInfo.setRowCount(blockletInfo.getNumberOfRows()); + // update min and max values in case of old store for measures as min and max is written + // opposite for measures in old store + byte[][] maxValues = CarbonUtil.updateMinMaxValues(fileFooter, + blockletInfo.getBlockletIndex().getMinMaxIndex().getMaxValues(), + blockletInfo.getBlockletIndex().getMinMaxIndex().getMinValues(), false); + byte[][] minValues = CarbonUtil.updateMinMaxValues(fileFooter, + blockletInfo.getBlockletIndex().getMinMaxIndex().getMaxValues(), + blockletInfo.getBlockletIndex().getMinMaxIndex().getMinValues(), true); + blockletInfo.getBlockletIndex().getMinMaxIndex().setMaxValues(maxValues); + blockletInfo.getBlockletIndex().getMinMaxIndex().setMinValues(minValues); detailInfo.setBlockletInfo(blockletInfo); detailInfo.setPagesCount((short) blockletInfo.getNumberOfPages()); detailInfo.setBlockletId(count); + info.setDataBlockFromOldStore(true); tableBlockInfos.add(info); count++; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1248bd4b/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java index b5b6017..75ec35e 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java @@ -35,7 +35,7 @@ public class ColumnFilterInfo implements Serializable { /** * Implicit column filter values to be used for block and blocklet pruning */ - private List<String> implicitColumnFilterList; + private Set<String> implicitColumnFilterList; private transient Set<String> implicitDriverColumnFilterList; private List<Integer> excludeFilterList; /** @@ -85,12 +85,15 @@ public class ColumnFilterInfo implements Serializable { public void setExcludeFilterList(List<Integer> excludeFilterList) { this.excludeFilterList = excludeFilterList; } - public List<String> getImplicitColumnFilterList() { + public Set<String> getImplicitColumnFilterList() { return implicitColumnFilterList; } public void setImplicitColumnFilterList(List<String> implicitColumnFilterList) { - this.implicitColumnFilterList = implicitColumnFilterList; + // this is done to improve the query performance. As the list of size increases time taken to + // search in list will increase as list contains method uses equals check internally but set + // will be very fast as it will directly use the has code to find the bucket and search + this.implicitColumnFilterList = new HashSet<>(implicitColumnFilterList); } public List<Object> getMeasuresFilterValuesList() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/1248bd4b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index e060c84..b62b77d 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -85,6 +85,8 @@ import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; +import org.apache.carbondata.core.util.comparator.Comparator; +import org.apache.carbondata.core.util.comparator.SerializableComparator; import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.format.BlockletHeader; @@ -2397,5 +2399,50 @@ public final class CarbonUtil { return Base64.decodeBase64(objectString.getBytes(CarbonCommonConstants.DEFAULT_CHARSET)); } + /** + * This method will be used to update the min and max values and this will be used in case of + * old store where min and max values for measures are written opposite + * (i.e max values in place of min and min in place of max values) + * + * @param dataFileFooter + * @param maxValues + * @param minValues + * @param isMinValueComparison + * @return + */ + public static byte[][] updateMinMaxValues(DataFileFooter dataFileFooter, byte[][] maxValues, + byte[][] minValues, boolean isMinValueComparison) { + byte[][] updatedMinMaxValues = new byte[maxValues.length][]; + if (isMinValueComparison) { + System.arraycopy(minValues, 0, updatedMinMaxValues, 0, minValues.length); + } else { + System.arraycopy(maxValues, 0, updatedMinMaxValues, 0, maxValues.length); + } + for (int i = 0; i < maxValues.length; i++) { + // update min and max values only for measures + if (!dataFileFooter.getColumnInTable().get(i).isDimensionColumn()) { + DataType dataType = dataFileFooter.getColumnInTable().get(i).getDataType(); + SerializableComparator comparator = Comparator.getComparator(dataType); + int compare; + if (isMinValueComparison) { + compare = comparator + .compare(DataTypeUtil.getMeasureObjectFromDataType(maxValues[i], dataType), + DataTypeUtil.getMeasureObjectFromDataType(minValues[i], dataType)); + if (compare < 0) { + updatedMinMaxValues[i] = maxValues[i]; + } + } else { + compare = comparator + .compare(DataTypeUtil.getMeasureObjectFromDataType(minValues[i], dataType), + DataTypeUtil.getMeasureObjectFromDataType(maxValues[i], dataType)); + if (compare > 0) { + updatedMinMaxValues[i] = minValues[i]; + } + } + } + } + return updatedMinMaxValues; + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1248bd4b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java index fdb5310..984efdb 100644 --- a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java +++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java @@ -1045,6 +1045,52 @@ public class CarbonUtilTest { Assert.assertTrue(schemaString.length() > schema.length()); } + @Test + public void testUpdateMinMaxValues() { + // create dimension and measure column schema + ColumnSchema dimensionColumnSchema = createColumnSchema(DataTypes.STRING, true); + ColumnSchema measureColumnSchema = createColumnSchema(DataTypes.DOUBLE, false); + List<ColumnSchema> columnSchemas = new ArrayList<>(2); + columnSchemas.add(dimensionColumnSchema); + columnSchemas.add(measureColumnSchema); + // create data file footer object + DataFileFooter fileFooter = new DataFileFooter(); + fileFooter.setColumnInTable(columnSchemas); + // initialise the expected values + int expectedMaxValue = 5; + int expectedMinValue = 2; + double expectedMeasureMaxValue = 28.74; + double expectedMeasureMinValue = -21.46; + // initialise the minValues + byte[][] minValues = new byte[2][]; + minValues[0] = new byte[] { 2 }; + ByteBuffer buffer = ByteBuffer.allocate(8); + minValues[1] = (byte[]) buffer.putDouble(28.74).flip().array(); + buffer = ByteBuffer.allocate(8); + // initialise the maxValues + byte[][] maxValues = new byte[2][]; + maxValues[0] = new byte[] { 5 }; + maxValues[1] = (byte[]) buffer.putDouble(-21.46).flip().array(); + byte[][] updateMaxValues = + CarbonUtil.updateMinMaxValues(fileFooter, maxValues, minValues, false); + byte[][] updateMinValues = + CarbonUtil.updateMinMaxValues(fileFooter, maxValues, minValues, true); + // compare max values + assert (expectedMaxValue == ByteBuffer.wrap(updateMaxValues[0]).get()); + assert (expectedMeasureMaxValue == ByteBuffer.wrap(updateMaxValues[1]).getDouble()); + + // compare min values + assert (expectedMinValue == ByteBuffer.wrap(updateMinValues[0]).get()); + assert (expectedMeasureMinValue == ByteBuffer.wrap(updateMinValues[1]).getDouble()); + } + + private ColumnSchema createColumnSchema(DataType dataType, boolean isDimensionColumn) { + ColumnSchema columnSchema = new ColumnSchema(); + columnSchema.setDataType(dataType); + columnSchema.setDimensionColumn(isDimensionColumn); + return columnSchema; + } + private String generateString(int length) { StringBuilder builder = new StringBuilder(); for (int i = 0; i < length; i++) {
