Repository: carbondata Updated Branches: refs/heads/master fef3384b8 -> 2bad144a2
[CARBONDATA-1867] Add support for task/segment level pruning Added support for task/segment level pruning. Added code to compute task level min/max which can be helpful for task/segment level pruning This closes #1624 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2bad144a Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2bad144a Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2bad144a Branch: refs/heads/master Commit: 2bad144a2f13bcdf5518490c8c08bad25d9ea84d Parents: fef3384 Author: manishgupta88 <[email protected]> Authored: Wed Dec 6 17:11:51 2017 +0530 Committer: ravipesala <[email protected]> Committed: Thu Dec 7 16:35:44 2017 +0530 ---------------------------------------------------------------------- .../carbondata/core/datamap/TableDataMap.java | 26 +++ .../carbondata/core/datamap/dev/DataMap.java | 7 + .../blockletindex/BlockletDataMap.java | 169 ++++++++++++++++--- .../scan/filter/FilterExpressionProcessor.java | 13 ++ .../core/scan/filter/SingleTableProvider.java | 3 +- .../hadoop/api/CarbonTableInputFormat.java | 6 +- .../carbondata/spark/rdd/CarbonScanRDD.scala | 2 +- 7 files changed, 195 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/2bad144a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java index 705a9fd..1d5c978 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datamap.dev.DataMap; import org.apache.carbondata.core.datamap.dev.DataMapFactory; import org.apache.carbondata.core.indexstore.Blocklet; @@ -165,4 +166,29 @@ public final class TableDataMap implements OperationEventListener { @Override public void onEvent(Event event, OperationContext opContext) { dataMapFactory.fireEvent(event); } + + /** + * Method to prune the segments based on task min/max values + * + * @param segmentIds + * @param filterExp + * @return + * @throws IOException + */ + public List<String> pruneSegments(List<String> segmentIds, FilterResolverIntf filterExp) + throws IOException { + List<String> prunedSegments = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + for (String segmentId : segmentIds) { + List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId); + for (DataMap dataMap : dataMaps) { + if (dataMap.isScanRequired(filterExp)) { + // If any one task in a given segment contains the data that means the segment need to + // be scanned and we need to validate further data maps in the same segment + prunedSegments.add(segmentId); + break; + } + } + } + return prunedSegments; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2bad144a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java index ada23ad..e7c30a9 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java @@ -42,6 +42,13 @@ public interface DataMap { */ List<Blocklet> prune(FilterResolverIntf filterExp); + /** + * Validate whether the current segment needs to be fetching the required data + * + * @param filterExp + * @return + */ + boolean isScanRequired(FilterResolverIntf filterExp); /** * Clear complete index table and release memory. http://git-wip-us.apache.org/repos/asf/carbondata/blob/2bad144a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java index fd514ea..1dc9b7a 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java @@ -55,10 +55,12 @@ import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor; import org.apache.carbondata.core.scan.filter.FilterUtil; import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataFileFooterConverter; import org.apache.carbondata.core.util.DataTypeUtil; @@ -91,8 +93,14 @@ public class BlockletDataMap implements DataMap, Cacheable { private static int BLOCK_INFO_INDEX = 8; + private static int TASK_MIN_VALUES_INDEX = 0; + + private static int TASK_MAX_VALUES_INDEX = 1; + private UnsafeMemoryDMStore unsafeMemoryDMStore; + private UnsafeMemoryDMStore unsafeMemoryTaskMinMaxDMStore; + private SegmentProperties segmentProperties; private int[] columnCardinality; @@ -111,6 +119,7 @@ public class BlockletDataMap implements DataMap, Cacheable { columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality(); segmentProperties = new SegmentProperties(columnInTable, columnCardinality); createSchema(segmentProperties); + createTaskMinMaxSchema(segmentProperties); } TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo(); if (fileFooter.getBlockletList() == null || fileFooter.getBlockletList().size() == 0) { @@ -124,6 +133,9 @@ public class BlockletDataMap implements DataMap, Cacheable { if (unsafeMemoryDMStore != null) { unsafeMemoryDMStore.finishWriting(); } + if (null != unsafeMemoryTaskMinMaxDMStore) { + unsafeMemoryTaskMinMaxDMStore.finishWriting(); + } LOGGER.info( "Time taken to load blocklet datamap from file : " + dataMapModel.getFilePath() + "is " + ( System.currentTimeMillis() - startTime)); @@ -134,27 +146,41 @@ public class BlockletDataMap implements DataMap, Cacheable { int[] minMaxLen = segmentProperties.getColumnsValueSize(); List<BlockletInfo> blockletList = fileFooter.getBlockletList(); CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema(); + DataMapRow taskMinMaxRow = null; + // Add one row to maintain task level min max for segment pruning + if (!blockletList.isEmpty()) { + taskMinMaxRow = new DataMapRowImpl(unsafeMemoryTaskMinMaxDMStore.getSchema()); + } for (int index = 0; index < blockletList.size(); index++) { DataMapRow row = new DataMapRowImpl(schema); int ordinal = 0; + int taskMinMaxOrdinal = 0; BlockletInfo blockletInfo = blockletList.get(index); // add start key as index key row.setByteArray(blockletInfo.getBlockletIndex().getBtreeIndex().getStartKey(), ordinal++); BlockletMinMaxIndex minMaxIndex = blockletInfo.getBlockletIndex().getMinMaxIndex(); - row.setRow(addMinMax(minMaxLen, schema[ordinal], - updateMinValues(minMaxIndex.getMinValues(), minMaxLen)), ordinal); + byte[][] minValues = updateMinValues(minMaxIndex.getMinValues(), minMaxLen); + row.setRow(addMinMax(minMaxLen, schema[ordinal], minValues), ordinal); + // compute and set task level min values + addTaskMinMaxValues(taskMinMaxRow, minMaxLen, + unsafeMemoryTaskMinMaxDMStore.getSchema()[taskMinMaxOrdinal], minValues, + TASK_MIN_VALUES_INDEX, true); ordinal++; - row.setRow(addMinMax(minMaxLen, schema[ordinal], - updateMaxValues(minMaxIndex.getMaxValues(), minMaxLen)), ordinal); + taskMinMaxOrdinal++; + byte[][] maxValues = updateMaxValues(minMaxIndex.getMaxValues(), minMaxLen); + row.setRow(addMinMax(minMaxLen, schema[ordinal], maxValues), ordinal); + // compute and set task level max values + addTaskMinMaxValues(taskMinMaxRow, minMaxLen, + unsafeMemoryTaskMinMaxDMStore.getSchema()[taskMinMaxOrdinal], maxValues, + TASK_MAX_VALUES_INDEX, false); ordinal++; row.setInt(blockletInfo.getNumberOfRows(), ordinal++); // add file path - byte[] filePathBytes = - filePath.getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS); + byte[] filePathBytes = filePath.getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS); row.setByteArray(filePathBytes, ordinal++); // add pages @@ -179,6 +205,18 @@ public class BlockletDataMap implements DataMap, Cacheable { throw new RuntimeException(e); } } + // write the task level min/max row to unsafe memory store + if (null != taskMinMaxRow) { + addTaskMinMaxRowToUnsafeMemoryStore(taskMinMaxRow); + } + } + + private void addTaskMinMaxRowToUnsafeMemoryStore(DataMapRow taskMinMaxRow) { + try { + unsafeMemoryTaskMinMaxDMStore.addIndexRowToUnsafe(taskMinMaxRow); + } catch (Exception e) { + throw new RuntimeException(e); + } } /** @@ -270,28 +308,54 @@ public class BlockletDataMap implements DataMap, Cacheable { return minRow; } + /** + * This method will compute min/max values at task level + * + * @param taskMinMaxRow + * @param minMaxLen + * @param carbonRowSchema + * @param minMaxValue + * @param ordinal + * @param isMinValueComparison + */ + private void addTaskMinMaxValues(DataMapRow taskMinMaxRow, int[] minMaxLen, + CarbonRowSchema carbonRowSchema, byte[][] minMaxValue, int ordinal, + boolean isMinValueComparison) { + DataMapRow row = taskMinMaxRow.getRow(ordinal); + byte[][] updatedMinMaxValues = minMaxValue; + if (null == row) { + CarbonRowSchema[] minSchemas = + ((CarbonRowSchema.StructCarbonRowSchema) carbonRowSchema).getChildSchemas(); + row = new DataMapRowImpl(minSchemas); + } else { + byte[][] existingMinMaxValues = getMinMaxValue(taskMinMaxRow, ordinal); + // Compare and update min max values + for (int i = 0; i < minMaxLen.length; i++) { + int compare = + ByteUtil.UnsafeComparer.INSTANCE.compareTo(existingMinMaxValues[i], minMaxValue[i]); + if (isMinValueComparison) { + if (compare < 0) { + updatedMinMaxValues[i] = existingMinMaxValues[i]; + } + } else if (compare > 0) { + updatedMinMaxValues[i] = existingMinMaxValues[i]; + } + } + } + int minMaxOrdinal = 0; + // min/max value adding + for (int i = 0; i < minMaxLen.length; i++) { + row.setByteArray(updatedMinMaxValues[i], minMaxOrdinal++); + } + taskMinMaxRow.setRow(row, ordinal); + } + private void createSchema(SegmentProperties segmentProperties) throws MemoryException { List<CarbonRowSchema> indexSchemas = new ArrayList<>(); // Index key indexSchemas.add(new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY)); - int[] minMaxLen = segmentProperties.getColumnsValueSize(); - // do it 2 times, one for min and one for max. - for (int k = 0; k < 2; k++) { - CarbonRowSchema[] mapSchemas = new CarbonRowSchema[minMaxLen.length]; - for (int i = 0; i < minMaxLen.length; i++) { - if (minMaxLen[i] <= 0) { - mapSchemas[i] = new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY); - } else { - mapSchemas[i] = - new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, minMaxLen[i]); - } - } - CarbonRowSchema mapSchema = - new CarbonRowSchema.StructCarbonRowSchema(DataTypes.createDefaultStructType(), - mapSchemas); - indexSchemas.add(mapSchema); - } + getMinMaxSchema(segmentProperties, indexSchemas); // for number of rows. indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.INT)); @@ -315,6 +379,51 @@ public class BlockletDataMap implements DataMap, Cacheable { new UnsafeMemoryDMStore(indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()])); } + private void createTaskMinMaxSchema(SegmentProperties segmentProperties) throws MemoryException { + List<CarbonRowSchema> taskMinMaxSchemas = new ArrayList<>(2); + getMinMaxSchema(segmentProperties, taskMinMaxSchemas); + unsafeMemoryTaskMinMaxDMStore = new UnsafeMemoryDMStore( + taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()])); + } + + private void getMinMaxSchema(SegmentProperties segmentProperties, + List<CarbonRowSchema> minMaxSchemas) { + // Index key + int[] minMaxLen = segmentProperties.getColumnsValueSize(); + // do it 2 times, one for min and one for max. + for (int k = 0; k < 2; k++) { + CarbonRowSchema[] mapSchemas = new CarbonRowSchema[minMaxLen.length]; + for (int i = 0; i < minMaxLen.length; i++) { + if (minMaxLen[i] <= 0) { + mapSchemas[i] = new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY); + } else { + mapSchemas[i] = + new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, minMaxLen[i]); + } + } + CarbonRowSchema mapSchema = + new CarbonRowSchema.StructCarbonRowSchema(DataTypes.createDefaultStructType(), + mapSchemas); + minMaxSchemas.add(mapSchema); + } + } + + @Override + public boolean isScanRequired(FilterResolverIntf filterExp) { + FilterExecuter filterExecuter = + FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null); + for (int i = 0; i < unsafeMemoryTaskMinMaxDMStore.getRowCount(); i++) { + DataMapRow unsafeRow = unsafeMemoryTaskMinMaxDMStore.getUnsafeRow(i); + boolean isScanRequired = FilterExpressionProcessor + .isScanRequired(filterExecuter, getMinMaxValue(unsafeRow, TASK_MAX_VALUES_INDEX), + getMinMaxValue(unsafeRow, TASK_MIN_VALUES_INDEX)); + if (isScanRequired) { + return true; + } + } + return false; + } + @Override public List<Blocklet> prune(FilterResolverIntf filterExp) { @@ -562,6 +671,11 @@ public class BlockletDataMap implements DataMap, Cacheable { unsafeMemoryDMStore = null; segmentProperties = null; } + // clear task min/max unsafe memory + if (null != unsafeMemoryTaskMinMaxDMStore) { + unsafeMemoryTaskMinMaxDMStore.freeMemory(); + unsafeMemoryTaskMinMaxDMStore = null; + } } @Override @@ -576,11 +690,14 @@ public class BlockletDataMap implements DataMap, Cacheable { @Override public long getMemorySize() { + long memoryUsed = 0L; if (unsafeMemoryDMStore != null) { - return unsafeMemoryDMStore.getMemoryUsed(); - } else { - return 0; + memoryUsed += unsafeMemoryDMStore.getMemoryUsed(); + } + if (null != unsafeMemoryTaskMinMaxDMStore) { + memoryUsed += unsafeMemoryTaskMinMaxDMStore.getMemoryUsed(); } + return memoryUsed; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2bad144a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java index e77d58e..6c804d7 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java @@ -48,6 +48,7 @@ import org.apache.carbondata.core.scan.expression.conditional.LessThanExpression import org.apache.carbondata.core.scan.expression.conditional.ListExpression; import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor; import org.apache.carbondata.core.scan.filter.intf.ExpressionType; import org.apache.carbondata.core.scan.filter.partition.AndFilterImpl; import org.apache.carbondata.core.scan.filter.partition.EqualToFilterImpl; @@ -529,4 +530,16 @@ public class FilterExpressionProcessor implements FilterProcessor { } return new RowLevelFilterResolverImpl(expression, false, false, tableIdentifier); } + + public static boolean isScanRequired(FilterExecuter filterExecuter, byte[][] maxValue, + byte[][] minValue) { + if (filterExecuter instanceof ImplicitColumnFilterExecutor) { + return ((ImplicitColumnFilterExecutor) filterExecuter) + .isFilterValuesPresentInAbstractIndex(maxValue, minValue); + } else { + // otherwise decide based on min/max value + BitSet bitSet = filterExecuter.isScanRequired(maxValue, minValue); + return !bitSet.isEmpty(); + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2bad144a/core/src/main/java/org/apache/carbondata/core/scan/filter/SingleTableProvider.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/SingleTableProvider.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/SingleTableProvider.java index 88fc8a6..d72e798 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/SingleTableProvider.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/SingleTableProvider.java @@ -31,7 +31,8 @@ public class SingleTableProvider implements TableProvider { @Override public CarbonTable getCarbonTable(CarbonTableIdentifier carbonTableIdentifier) throws IOException { - if (carbonTable.getCarbonTableIdentifier().equals(carbonTableIdentifier)) { + if (carbonTable.getCarbonTableIdentifier().getTableUniqueName() + .equals(carbonTableIdentifier.getTableUniqueName())) { return carbonTable; } else { throw new IOException("Carbon table does not exist with identifier " + carbonTableIdentifier); http://git-wip-us.apache.org/repos/asf/carbondata/blob/2bad144a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index e4e9ceb..15d1304 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -159,7 +159,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { /** * Get the cached CarbonTable or create it by TableInfo in `configuration` */ - private CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException { + public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException { if (carbonTable == null) { // carbon table should be created either from deserialized table info (schema saved in // hive metastore) or by reading schema in HDFS (schema saved in HDFS) @@ -303,7 +303,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { configuration.set(INPUT_FILES, CarbonUtil.getSegmentString(validFiles)); } - private AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration) + public AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration) throws IOException { String tablePath = configuration.get(INPUT_DIR, ""); try { @@ -887,7 +887,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { /** * return valid segment to access */ - private String[] getSegmentsToAccess(JobContext job) { + public String[] getSegmentsToAccess(JobContext job) { String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, ""); if (segmentString.trim().isEmpty()) { return new String[0]; http://git-wip-us.apache.org/repos/asf/carbondata/blob/2bad144a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 1fa838c..d599c22 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -328,7 +328,7 @@ class CarbonScanRDD( iterator.asInstanceOf[Iterator[InternalRow]] } - private def prepareInputFormatForDriver(conf: Configuration): CarbonTableInputFormat[Object] = { + def prepareInputFormatForDriver(conf: Configuration): CarbonTableInputFormat[Object] = { CarbonTableInputFormat.setTableInfo(conf, tableInfo) CarbonTableInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName) CarbonTableInputFormat.setTableName(conf, tableInfo.getFactTable.getTableName)
