This is an automated email from the ASF dual-hosted git repository.
liuzhi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 3acc2de [CARBONDATA-3700] Optimize prune performance when prunning
with multi-threads
3acc2de is described below
commit 3acc2de351940c00564744ddf5da2a681a481a75
Author: h00424960 <h00424960@SZA150414400A>
AuthorDate: Fri Feb 14 23:25:59 2020 +0800
[CARBONDATA-3700] Optimize prune performance when prunning with
multi-threads
Why is this PR needed?
1. When pruning with multi-threads, there is a bug hambers the prunning
performance heavily.
When the pruning results in no blocklets to map the query filter, The
getExtendblocklet function will be triggered to get the extend blocklet
metadata, when the Input of this function is an empty blocklet list, this
function is expected to return an empty extendblocklet list directyly , but now
there is a bug leading to "a hashset add operation" overhead which is
meaningless.
Meanwhile, When pruning with multi-threads, the getExtendblocklet function
will be triggerd for each blocklet, which should be avoided by triggerring this
function for each segment.
2. When pruning, there is a bug hambers the prunning performance heavily.
ValidatePartitionInfo operation is executed by every blocklet, and it
iterates all the partitions info for each blocklet. sIf there are millions
blocklets, and hundreds partitions, the compatutaion complexity will be
hundreds millions.
3. In the prunning, It will create filterexecuter pre blocklet, which
involves a huge performance degrade when there are serveral millions blocklet.
Specically, The creating of filterexecuter is a heavy operation which
involves a lot of time cost init works.
What changes were proposed in this PR?
1.1 if the input is an empty blocklet list in the getExtendblocklet
function, we return an empty extendblocklet list directyly
1.2 We trigger the getExtendblocklet functon for each segment instead of
each blocklet.
2.1 Remove validatePartitionInfo. Add the validatePartiionInfo in the
getDataMap processing
3.1 We create filterexecuter per segment instead of that per blocklet, and
share the filterexecuter between all blocklets.
In the case, add column or change sort column, then update the segment,
there will be serveral different columnschemas of blocklets which exist in the
segment, only if the columnshemas of all the blocklets are same, the
filterexecuter can be shared. So we add a fingerprinter for each blocklet, to
identify the columnschema. If the fingerprinters are same, which means that the
columnschema are equal with each other, so the filterexecuter can be reused
Does this PR introduce any user interface change?
No.
Is any new testcase added?
Yes.
This closes #3620
---
.../core/constants/CarbonCommonConstants.java | 10 +-
.../carbondata/core/datamap/TableDataMap.java | 60 +++++++++--
.../carbondata/core/datamap/dev/DataMap.java | 5 +-
.../datamap/dev/cgdatamap/CoarseGrainDataMap.java | 3 +-
.../datamap/dev/fgdatamap/FineGrainDataMap.java | 3 +-
.../core/datastore/block/SegmentProperties.java | 112 ++++++++++++++++++++-
.../indexstore/blockletindex/BlockDataMap.java | 29 +++---
.../blockletindex/BlockletDataMapFactory.java | 9 +-
.../carbondata/core/util/CarbonProperties.java | 31 ++++++
.../datastore/block/SegmentPropertiesTest.java | 36 ++++++-
docs/configuration-parameters.md | 1 +
.../datamap/bloom/BloomCoarseGrainDataMap.java | 3 +-
.../datamap/lucene/LuceneFineGrainDataMap.java | 4 +-
.../blockprune/BlockPruneQueryTestCase.scala | 89 ++++++++++++++--
.../testsuite/datamap/CGDataMapTestCase.scala | 6 +-
.../testsuite/datamap/FGDataMapTestCase.scala | 6 +-
16 files changed, 353 insertions(+), 54 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 333c7b6..c020fc2 100644
---
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1499,8 +1499,14 @@ public final class CarbonCommonConstants {
public static final String
CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING_DEFAULT = "4";
- // block prune in multi-thread if files size more than 100K files.
- public static final int
CARBON_DRIVER_PRUNING_MULTI_THREAD_ENABLE_FILES_COUNT = 100000;
+ // block prune in multi-thread if files count more than specify threshold.
+ @CarbonProperty
+ public static final String
CARBON_DRIVER_PRUNING_MULTI_THREAD_ENABLE_FILES_COUNT =
+ "carbon.driver.pruning.multi.thread.enable.files.count";
+
+ // the default value of file count to trigger block prune in multi-thread is
100K files.
+ public static final String
CARBON_DRIVER_PRUNING_MULTI_THREAD_ENABLE_FILES_COUNT_DEFAULT =
+ "100000";
/**
* max executor threads used for block pruning [1 to 4 threads]
diff --git
a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index cb6fed7..f83d486 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -49,6 +49,9 @@ import
org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.events.Event;
@@ -117,10 +120,14 @@ public final class TableDataMap extends
OperationEventListener {
final List<ExtendedBlocklet> blocklets = new ArrayList<>();
List<Segment> segments = getCarbonSegments(allsegments);
final Map<Segment, List<DataMap>> dataMaps;
- if (filter == null || filter.isEmpty() || partitions == null ||
partitions.isEmpty()) {
- dataMaps = dataMapFactory.getDataMaps(segments);
- } else {
+ if (table.isHivePartitionTable() && filter != null && !filter.isEmpty() &&
partitions != null) {
dataMaps = dataMapFactory.getDataMaps(segments, partitions);
+ } else {
+ dataMaps = dataMapFactory.getDataMaps(segments);
+ }
+
+ if (dataMaps.isEmpty()) {
+ return blocklets;
}
// for non-filter queries
// for filter queries
@@ -138,8 +145,10 @@ public final class TableDataMap extends
OperationEventListener {
}
}
int numOfThreadsForPruning = CarbonProperties.getNumOfThreadsForPruning();
+ int carbonDriverPruningMultiThreadEnableFilesCount =
+ CarbonProperties.getDriverPruningMultiThreadEnableFilesCount();
if (numOfThreadsForPruning == 1 || datamapsCount < numOfThreadsForPruning
|| totalFiles
- <
CarbonCommonConstants.CARBON_DRIVER_PRUNING_MULTI_THREAD_ENABLE_FILES_COUNT) {
+ < carbonDriverPruningMultiThreadEnableFilesCount) {
// use multi-thread, only if the files are more than 0.1 million.
// As 0.1 million files block pruning can take only 1 second.
// Doing multi-thread for smaller values is not recommended as
@@ -181,18 +190,32 @@ public final class TableDataMap extends
OperationEventListener {
List<PartitionSpec> partitions, List<ExtendedBlocklet> blocklets,
Map<Segment, List<DataMap>> dataMaps) throws IOException {
for (Segment segment : segments) {
+ if (dataMaps.get(segment).isEmpty() || dataMaps.get(segment) == null) {
+ continue;
+ }
List<Blocklet> pruneBlocklets = new ArrayList<>();
SegmentProperties segmentProperties =
segmentPropertiesFetcher.getSegmentProperties(segment, partitions);
if (filter.isResolvedOnSegment(segmentProperties)) {
+ FilterExecuter filterExecuter = FilterUtil
+ .getFilterExecuterTree(filter.getResolver(), segmentProperties,
+ null, table.getMinMaxCacheColumns(segmentProperties),
+ false);
for (DataMap dataMap : dataMaps.get(segment)) {
pruneBlocklets.addAll(
- dataMap.prune(filter.getResolver(), segmentProperties,
partitions));
+ dataMap.prune(filter.getResolver(), segmentProperties,
partitions, filterExecuter));
}
} else {
+ Expression expression = filter.getExpression();
+ FilterExecuter filterExecuter = FilterUtil
+ .getFilterExecuterTree(new DataMapFilter(segmentProperties, table,
+ expression).getResolver(), segmentProperties,
+ null, table.getMinMaxCacheColumns(segmentProperties),
+ false);
for (DataMap dataMap : dataMaps.get(segment)) {
pruneBlocklets.addAll(
- dataMap.prune(filter.getExpression(), segmentProperties,
partitions, table));
+ dataMap.prune(filter.getExpression(), segmentProperties,
+ partitions, table, filterExecuter));
}
}
blocklets.addAll(
@@ -302,19 +325,30 @@ public final class TableDataMap extends
OperationEventListener {
segmentPropertiesFetcher.getSegmentPropertiesFromDataMap(dataMapList.get(0));
Segment segment = segmentDataMapGroup.getSegment();
if (filter.isResolvedOnSegment(segmentProperties)) {
+ FilterExecuter filterExecuter = FilterUtil
+ .getFilterExecuterTree(filter.getResolver(),
segmentProperties,
+ null, table.getMinMaxCacheColumns(segmentProperties),
+ false);
for (int i = segmentDataMapGroup.getFromIndex();
i <= segmentDataMapGroup.getToIndex(); i++) {
List<Blocklet> dmPruneBlocklets = dataMapList.get(i).prune(
- filter.getResolver(), segmentProperties, partitions);
+ filter.getResolver(), segmentProperties, partitions,
filterExecuter);
pruneBlocklets.addAll(addSegmentId(
blockletDetailsFetcher.getExtendedBlocklets(dmPruneBlocklets, segment),
segment));
}
} else {
+ Expression filterExpression = filter.getNewCopyOfExpression();
+ FilterExecuter filterExecuter = FilterUtil
+ .getFilterExecuterTree(new DataMapFilter(segmentProperties,
table,
+ filterExpression).getResolver(),
segmentProperties,
+ null, table.getMinMaxCacheColumns(segmentProperties),
+ false);
for (int i = segmentDataMapGroup.getFromIndex();
i <= segmentDataMapGroup.getToIndex(); i++) {
List<Blocklet> dmPruneBlocklets = dataMapList.get(i).prune(
- filter.getNewCopyOfExpression(), segmentProperties,
partitions, table);
+ filterExpression, segmentProperties, partitions, table,
+ filterExecuter);
pruneBlocklets.addAll(addSegmentId(
blockletDetailsFetcher.getExtendedBlocklets(dmPruneBlocklets, segment),
segment));
@@ -406,10 +440,14 @@ public final class TableDataMap extends
OperationEventListener {
FilterResolverIntf filterExp, List<PartitionSpec> partitions) throws
IOException {
List<ExtendedBlocklet> detailedBlocklets = new ArrayList<>();
List<Blocklet> blocklets = new ArrayList<>();
+ SegmentProperties segmentProperties =
+
segmentPropertiesFetcher.getSegmentProperties(distributable.getSegment(),
partitions);
+ FilterExecuter filterExecuter = FilterUtil
+ .getFilterExecuterTree(filterExp, segmentProperties,
+ null, table.getMinMaxCacheColumns(segmentProperties),
+ false);
for (DataMap dataMap : dataMaps) {
- blocklets.addAll(dataMap.prune(filterExp,
-
segmentPropertiesFetcher.getSegmentProperties(distributable.getSegment(),
partitions),
- partitions));
+ blocklets.addAll(dataMap.prune(filterExp, segmentProperties, partitions,
filterExecuter));
}
BlockletSerializer serializer = new BlockletSerializer();
String writePath =
diff --git
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index cc78cff..af36277 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -28,6 +28,7 @@ import org.apache.carbondata.core.indexstore.Blocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
/**
@@ -46,14 +47,14 @@ public interface DataMap<T extends Blocklet> {
* It returns the list of blocklets where these filters can exist.
*/
List<T> prune(FilterResolverIntf filterExp, SegmentProperties
segmentProperties,
- List<PartitionSpec> partitions) throws IOException;
+ List<PartitionSpec> partitions, FilterExecuter filterExecuter) throws
IOException;
/**
* Prune the datamap with filter expression and partition information. It
returns the list of
* blocklets where these filters can exist.
*/
List<T> prune(Expression filter, SegmentProperties segmentProperties,
- List<PartitionSpec> partitions, CarbonTable carbonTable);
+ List<PartitionSpec> partitions, CarbonTable carbonTable, FilterExecuter
filterExecuter);
/**
* Prune the data maps for finding the row count. It returns a Map of
diff --git
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java
index fc08d92..dd6440d 100644
---
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java
+++
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMap.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.indexstore.Blocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
/**
* DataMap for Coarse Grain level, see {@link
org.apache.carbondata.core.datamap.DataMapLevel#CG}
@@ -39,7 +40,7 @@ public abstract class CoarseGrainDataMap implements
DataMap<Blocklet> {
@Override
public List<Blocklet> prune(Expression expression, SegmentProperties
segmentProperties,
- List<PartitionSpec> partitions, CarbonTable carbonTable) {
+ List<PartitionSpec> partitions, CarbonTable carbonTable, FilterExecuter
filterExecuter) {
throw new UnsupportedOperationException("Filter expression not supported");
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java
index c80dc17..d9fbb1f 100644
---
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java
+++
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainDataMap.java
@@ -28,6 +28,7 @@ import
org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
/**
* DataMap for Fine Grain level, see {@link
org.apache.carbondata.core.datamap.DataMapLevel#FG}
@@ -38,7 +39,7 @@ public abstract class FineGrainDataMap implements
DataMap<FineGrainBlocklet> {
@Override
public List<FineGrainBlocklet> prune(Expression filter, SegmentProperties
segmentProperties,
- List<PartitionSpec> partitions, CarbonTable carbonTable) {
+ List<PartitionSpec> partitions, CarbonTable carbonTable, FilterExecuter
filterExecuter) {
throw new UnsupportedOperationException("Filter expression not supported");
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
index 056d5e4..25540a7 100644
---
a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
+++
b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
@@ -25,7 +25,10 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -35,6 +38,8 @@ import
org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.log4j.Logger;
+
/**
* This class contains all the details about the restructuring information of
* the block. This will be used during query execution to handle restructure
@@ -42,6 +47,24 @@ import org.apache.carbondata.core.util.CarbonUtil;
*/
public class SegmentProperties {
+ private static final Logger LOG =
+ LogServiceFactory.getLogService(SegmentProperties.class.getName());
+
+ // When calcuting the fingerpinter of all columns. In order to
+ // identify dimension columns with other column. The fingerprinter
+ // of dimensions will leftshift 1 bit
+ private static final int DIMENSIONS_FINGER_PRINTER_SHIFT = 1;
+
+ // When calcuting the fingerpinter of all columns. In order to
+ // identify measure columns with other column. The fingerprinter
+ // of measures will leftshift 2 bit
+ private static final int MEASURES_FINGER_PRINTER_SHIFT = 2;
+
+ // When calcuting the fingerpinter of all columns. In order to
+ // identify complex columns with other column. The fingerprinter
+ // of complex columns will leftshift 3 bit
+ private static final int COMPLEX_FINGER_PRINTER_SHIFT = 3;
+
/**
* list of dimension present in the block
*/
@@ -89,6 +112,25 @@ public class SegmentProperties {
private int lastDimensionColOrdinal;
+ /**
+ * The fingerprinter is the xor result of all the columns in table.
+ * Besides, in the case of two segmentproperties have same columns
+ * but different sortcolumn, n like there is a column exists in both
+ * segmentproperties, but is dimension in one segmentproperties,
+ * but is a measure in the other. In order to identify the difference
+ * of these two segmentproperties. The xor result of all dimension
+ * will leftshift 1 bit, the xor results of all measures will leftshift
+ * 2bit, and the xor results of all complex columns will leftshift 3 bits
+ * Sum up, the Formula of generate fingerprinter is
+ *
+ * fingerprinter = (dimensionfingerprinter >> 1)
+ * ^ (measurefingerprinter >> 1) ^ (complexfingerprinter >> 1)
+ * dimensionsfingerprinter = dimension1 ^ dimension2 ^ ...
+ * measuresfingerprinter = measure1 ^ measure2 ^ measure3 ...
+ * complexfingerprinter = complex1 ^ complex2 ^ complex3 ...
+ */
+ private long fingerprinter = Long.MAX_VALUE;
+
public SegmentProperties(List<ColumnSchema> columnsInTable) {
dimensions = new
ArrayList<CarbonDimension>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
complexDimensions =
@@ -130,9 +172,6 @@ public class SegmentProperties {
fillBlockToDimensionOrdinalMapping();
}
- /**
- *
- */
private void fillBlockToDimensionOrdinalMapping() {
Set<Entry<Integer, Integer>> blocks =
dimensionOrdinalToChunkMapping.entrySet();
Iterator<Entry<Integer, Integer>> blockItr = blocks.iterator();
@@ -148,6 +187,31 @@ public class SegmentProperties {
}
/**
+ * compare the segmentproperties based on fingerprinter
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof SegmentProperties)) {
+ return false;
+ }
+ // If these two segmentproperties have different number of columns
+ // Return false directly
+ SegmentProperties segmentProperties = (SegmentProperties) obj;
+ if (this.getNumberOfColumns() != segmentProperties.getNumberOfColumns()) {
+ return false;
+ }
+ // Compare the fingerprinter
+ return getFingerprinter() != Long.MIN_VALUE &&
+ segmentProperties.getFingerprinter() != Long.MIN_VALUE &&
+ (getFingerprinter() == segmentProperties.getFingerprinter());
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+
+ /**
* Below method will be used to add the complex dimension child
* block index.It is a recursive method which will be get the children
* add the block index
@@ -184,6 +248,48 @@ public class SegmentProperties {
}
/**
+ * fingerprinter = (dimensionfingerprinter >> 1)
+ * ^ (measurefingerprinter >> 1) ^ (complexfingerprinter >> 1)
+ * dimensionsfingerprinter = dimension1 ^ dimension2 ^ ...
+ * measuresfingerprinter = measure1 ^ measure2 ^ measure3 ...
+ * complexfingerprinter = complex1 ^ complex2 ^ complex3 ...
+ */
+ protected long getFingerprinter() {
+ if (this.fingerprinter == Long.MAX_VALUE) {
+ long dimensionsFingerPrinter = getFingerprinter(this.dimensions.stream()
+ .map(t -> t.getColumnSchema()).collect(Collectors.toList()));
+ long measuresFingerPrinter = getFingerprinter(this.measures.stream()
+ .map(t -> t.getColumnSchema()).collect(Collectors.toList()));
+ long complexFingerPrinter =
getFingerprinter(this.complexDimensions.stream()
+ .map(t -> t.getColumnSchema()).collect(Collectors.toList()));
+ this.fingerprinter = (dimensionsFingerPrinter >>
DIMENSIONS_FINGER_PRINTER_SHIFT)
+ ^ (measuresFingerPrinter >> MEASURES_FINGER_PRINTER_SHIFT)
+ ^ (complexFingerPrinter >> COMPLEX_FINGER_PRINTER_SHIFT);
+ }
+ return this.fingerprinter;
+ }
+
+ private long getFingerprinter(List<ColumnSchema> columns) {
+ int counter = 0;
+ ColumnSchema columnSchema = null;
+ long fingerprint = Long.MAX_VALUE;
+ while (counter < columns.size()) {
+ columnSchema = columns.get(counter);
+ UUID columnUUID = null;
+ try {
+ columnUUID = UUID.fromString(columnSchema.getColumnUniqueId());
+ } catch (Exception e) {
+ LOG.error("Invalid UUID string: " + columnSchema.getColumnUniqueId());
+ return Long.MIN_VALUE;
+ }
+ long columnUUIDToBits = columnUUID.getMostSignificantBits();
+ fingerprint = fingerprint ^ columnUUIDToBits;
+ counter++;
+ }
+ return fingerprint;
+ }
+
+ /**
* below method will fill dimension and measure detail of the block.
*
* @param columnsInTable
diff --git
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index acccf8a..0ffb7c0 100644
---
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -652,7 +652,8 @@ public class BlockDataMap extends CoarseGrainDataMap
return blockletToRowCountMap;
}
- private List<Blocklet> prune(FilterResolverIntf filterExp) {
+ private List<Blocklet> prune(FilterResolverIntf filterExp, FilterExecuter
filterExecuter,
+ SegmentProperties segmentProperties) {
if (memoryDMStore.getRowCount() == 0) {
return new ArrayList<>();
}
@@ -676,10 +677,13 @@ public class BlockDataMap extends CoarseGrainDataMap
// Remove B-tree jump logic as start and end key prepared is not
// correct for old store scenarios
int entryIndex = 0;
- FilterExecuter filterExecuter = FilterUtil.getFilterExecuterTree(
- filterExp, getSegmentProperties(), null, getMinMaxCacheColumns(),
false);
// flag to be used for deciding whether use min/max in executor pruning
for BlockletDataMap
boolean useMinMaxForPruning = useMinMaxForExecutorPruning(filterExp);
+ if (!validateSegmentProperties(segmentProperties)) {
+ filterExecuter = FilterUtil
+ .getFilterExecuterTree(filterExp, getSegmentProperties(),
+ null, getMinMaxCacheColumns(), false);
+ }
// min and max for executor pruning
while (entryIndex < numEntries) {
DataMapRow row = memoryDMStore.getDataMapRow(schema, entryIndex);
@@ -713,30 +717,23 @@ public class BlockDataMap extends CoarseGrainDataMap
@Override
public List<Blocklet> prune(Expression expression, SegmentProperties
properties,
- List<PartitionSpec> partitions, CarbonTable carbonTable) {
+ List<PartitionSpec> partitions, CarbonTable carbonTable, FilterExecuter
filterExecuter) {
return prune(new DataMapFilter(properties, carbonTable,
expression).getResolver(), properties,
- partitions);
+ partitions, filterExecuter);
}
@Override
public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties
segmentProperties,
- List<PartitionSpec> partitions) {
+ List<PartitionSpec> partitions, FilterExecuter filterExecuter) {
if (memoryDMStore.getRowCount() == 0) {
return new ArrayList<>();
}
- // if it has partitioned datamap but there is no partitioned information
stored, it means
- // partitions are dropped so return empty list.
- if (partitions != null) {
- if (!validatePartitionInfo(partitions)) {
- return new ArrayList<>();
- }
- }
// Prune with filters if the partitions are existed in this datamap
// changed segmentProperties to this.segmentProperties to make sure the
pruning with its own
// segmentProperties.
// Its a temporary fix. The Interface DataMap.prune(FilterResolverIntf
filterExp,
// SegmentProperties segmentProperties, List<PartitionSpec> partitions)
should be corrected
- return prune(filterExp);
+ return prune(filterExp, filterExecuter, segmentProperties);
}
private boolean validatePartitionInfo(List<PartitionSpec> partitions) {
@@ -947,6 +944,10 @@ public class BlockDataMap extends CoarseGrainDataMap
return memoryUsed;
}
+ protected boolean validateSegmentProperties(SegmentProperties
tableSegmentProperties) {
+ return tableSegmentProperties.equals(getSegmentProperties());
+ }
+
protected SegmentProperties getSegmentProperties() {
return segmentPropertiesWrapper.getSegmentProperties();
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 8a6545b..930d031 100644
---
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -166,7 +166,7 @@ public class BlockletDataMapFactory extends
CoarseGrainDataMapFactory
List<TableBlockIndexUniqueIdentifierWrapper>
tableBlockIndexUniqueIdentifierWrappers,
Set<TableBlockIndexUniqueIdentifier> identifiers) {
for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier :
identifiers) {
- if (null != partitionsToPrune && !partitionsToPrune.isEmpty()) {
+ if (null != partitionsToPrune) {
// add only tableBlockUniqueIdentifier that matches the partition
// get the indexFile Parent path and compare with the PartitionPath,
if matches, then add
// the corresponding tableBlockIndexUniqueIdentifier for pruning
@@ -232,6 +232,10 @@ public class BlockletDataMapFactory extends
CoarseGrainDataMapFactory
public List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets,
Segment segment)
throws IOException {
List<ExtendedBlocklet> detailedBlocklets = new
ArrayList<>(blocklets.size() + 1);
+ // if the blocklets is empty, return the empty detailed blocklets list
directly.
+ if (blocklets.size() == 0) {
+ return detailedBlocklets;
+ }
// If it is already detailed blocklet then type cast and return same
if (blocklets.size() > 0 && blocklets.get(0) instanceof ExtendedBlocklet) {
for (Blocklet blocklet : blocklets) {
@@ -493,7 +497,8 @@ public class BlockletDataMapFactory extends
CoarseGrainDataMapFactory
List<CoarseGrainDataMap> dataMaps = getDataMaps(segment, partitions);
for (CoarseGrainDataMap dataMap : dataMaps) {
blocklets.addAll(dataMap
- .prune((FilterResolverIntf) null, getSegmentProperties(segment,
partitions), partitions));
+ .prune((FilterResolverIntf) null, getSegmentProperties(segment,
partitions), partitions,
+ null));
}
return blocklets;
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index f322592..81faba0 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1827,6 +1827,9 @@ public final class CarbonProperties {
}
}
+ /**
+ * This method validates the numOfThreadsForPruning
+ */
public static int getNumOfThreadsForPruning() {
int numOfThreadsForPruning =
Integer.parseInt(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING,
@@ -1844,6 +1847,34 @@ public final class CarbonProperties {
}
/**
+ * This method validates the driverPruningMultiThreadEnableFilesCount
+ */
+ public static int getDriverPruningMultiThreadEnableFilesCount() {
+ int driverPruningMultiThreadEnableFilesCount = 0;
+ try {
+ driverPruningMultiThreadEnableFilesCount =
Integer.parseInt(CarbonProperties.getInstance()
+ .getProperty(
+
CarbonCommonConstants.CARBON_DRIVER_PRUNING_MULTI_THREAD_ENABLE_FILES_COUNT,
+
CarbonCommonConstants.CARBON_DRIVER_PRUNING_MULTI_THREAD_ENABLE_FILES_COUNT_DEFAULT));
+ if (driverPruningMultiThreadEnableFilesCount <= 0) {
+ LOGGER.info("The driver prunning multithread enable files count value
\""
+ + driverPruningMultiThreadEnableFilesCount
+ + "\" is invalid. Using the default value \""
+ +
CarbonCommonConstants.CARBON_DRIVER_PRUNING_MULTI_THREAD_ENABLE_FILES_COUNT_DEFAULT);
+ driverPruningMultiThreadEnableFilesCount =
Integer.parseInt(CarbonCommonConstants
+ .CARBON_DRIVER_PRUNING_MULTI_THREAD_ENABLE_FILES_COUNT_DEFAULT);
+ }
+ } catch (NumberFormatException e) {
+ LOGGER.info("The driver prunning multithread enable files count value " +
+ "is invalid. Using the default value \""
+ +
CarbonCommonConstants.CARBON_DRIVER_PRUNING_MULTI_THREAD_ENABLE_FILES_COUNT_DEFAULT);
+ driverPruningMultiThreadEnableFilesCount =
Integer.parseInt(CarbonCommonConstants
+ .CARBON_DRIVER_PRUNING_MULTI_THREAD_ENABLE_FILES_COUNT_DEFAULT);
+ }
+ return driverPruningMultiThreadEnableFilesCount;
+ }
+
+ /**
* Validate and get the input metrics interval
*
* @return input metrics interval
diff --git
a/core/src/test/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesTest.java
b/core/src/test/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesTest.java
index c3c89b4..b0d6b12 100644
---
a/core/src/test/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesTest.java
+++
b/core/src/test/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesTest.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
+import java.util.stream.Collectors;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -38,8 +39,9 @@ public class SegmentPropertiesTest extends TestCase {
private SegmentProperties blockMetadataInfos;
+ private List<ColumnSchema> columnSchema = new ArrayList<ColumnSchema>();
+
@BeforeClass public void setUp() {
- List<ColumnSchema> columnSchema = new ArrayList<ColumnSchema>();
columnSchema.add(getDimensionColumn1());
columnSchema.add(getDimensionColumn2());
columnSchema.add(getDimensionColumn3());
@@ -55,6 +57,25 @@ public class SegmentPropertiesTest extends TestCase {
blockMetadataInfos = new SegmentProperties(columnSchema);
}
+ @Test public void testTwoSegmentPropertiesAreEqualsWithEachOther() {
+ List<ColumnSchema> columnSchemasClone = columnSchema.stream()
+ .map(t -> deepCopy(t)) // or .map(Suggestion::new)
+ .collect(Collectors.toList());
+
+ SegmentProperties segmentPropertiesWithSameColumnAndSameOrder = new
SegmentProperties(columnSchemasClone);
+
assertTrue(blockMetadataInfos.equals(segmentPropertiesWithSameColumnAndSameOrder));
+
+ columnSchemasClone.add(columnSchemasClone.remove(5));
+ columnSchemasClone.add(columnSchemasClone.remove(1));
+ SegmentProperties segmentPropertiesWithSameColumnButDifferentOrder = new
SegmentProperties(columnSchemasClone);
+
assertFalse(blockMetadataInfos.equals(segmentPropertiesWithSameColumnButDifferentOrder));
+
+ columnSchemasClone.remove(2);
+ SegmentProperties segmentPropertiesWithDifferentColumn = new
SegmentProperties(columnSchemasClone);
+
+
assertFalse(blockMetadataInfos.equals(segmentPropertiesWithDifferentColumn));
+ }
+
@Test public void testBlockMetadataHasProperDimensionCardinality() {
int[] cardinality = {-1, -1, -1, -1, -1, -1, -1, -1};
boolean isProper = true;
@@ -139,6 +160,19 @@ public class SegmentPropertiesTest extends TestCase {
assertTrue(true);
}
+ private ColumnSchema deepCopy(ColumnSchema scr) {
+ ColumnSchema dest = new ColumnSchema();
+ dest.setColumnName(scr.getColumnName());
+ dest.setColumnUniqueId(scr.getColumnUniqueId());
+ dest.setDataType(scr.getDataType());
+ if (scr.isDimensionColumn()) {
+ dest.setDimensionColumn(scr.isDimensionColumn());
+ dest.setNumberOfChild(scr.getNumberOfChild());
+ }
+ dest.setEncodingList(scr.getEncodingList());
+ return dest;
+ }
+
private ColumnSchema getDimensionColumn1() {
ColumnSchema dimColumn = new ColumnSchema();
dimColumn.setColumnName("IMEI");
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 932f3c6..1f521bdc 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -145,6 +145,7 @@ This section provides the details of all the configurations
required for the Car
| carbon.push.rowfilters.for.vector | false | When enabled complete row
filters will be handled by carbon in case of vector. If it is disabled then
only page level pruning will be done by carbon and row level filtering will be
done by spark for vector. And also there are scan optimizations in carbon to
avoid multiple data copies when this parameter is set to false. There is no
change in flow for non-vector based queries. |
| carbon.query.prefetch.enable | true | By default this property is true, so
prefetch is used in query to read next blocklet asynchronously in other thread
while processing current blocklet in main thread. This can help to reduce CPU
idle time. Setting this property false will disable this prefetch feature in
query. |
| carbon.query.stage.input.enable | false | Stage input files are data files
written by external applications (such as Flink), but have not been loaded into
carbon table. Enabling this configuration makes query to include these files,
thus makes query on latest data. However, since these files are not indexed,
query maybe slower as full scan is required for these files. |
+| carbon.driver.pruning.multi.thread.enable.files.count | 100000 | To prune in
multi-thread when total number of segment files for a query increases beyond
the configured value. |
## Data Mutation Configuration
| Parameter | Default Value | Description |
diff --git
a/index/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
b/index/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
index 6917205..d55e681 100644
---
a/index/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
+++
b/index/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
@@ -47,6 +47,7 @@ import
org.apache.carbondata.core.scan.expression.conditional.InExpression;
import org.apache.carbondata.core.scan.expression.conditional.ListExpression;
import
org.apache.carbondata.core.scan.expression.exception.FilterIllegalMemberException;
import org.apache.carbondata.core.scan.expression.logical.AndExpression;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
@@ -132,7 +133,7 @@ public class BloomCoarseGrainDataMap extends
CoarseGrainDataMap {
@Override
public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties
segmentProperties,
- List<PartitionSpec> partitions) {
+ List<PartitionSpec> partitions, FilterExecuter filterExecuter) {
Set<Blocklet> hitBlocklets = null;
if (filterExp == null) {
// null is different from empty here. Empty means after pruning, no
blocklet need to scan.
diff --git
a/index/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
b/index/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
index da1fe5c..0b1acaf 100644
---
a/index/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
+++
b/index/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
@@ -36,6 +36,7 @@ import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.expression.MatchExpression;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
@@ -201,7 +202,8 @@ public class LuceneFineGrainDataMap extends
FineGrainDataMap {
*/
@Override
public List<FineGrainBlocklet> prune(FilterResolverIntf filterExp,
- SegmentProperties segmentProperties, List<PartitionSpec> partitions)
throws IOException {
+ SegmentProperties segmentProperties, List<PartitionSpec> partitions,
+ FilterExecuter filterExecuter) throws IOException {
// convert filter expr into lucene list query
List<String> fields = new ArrayList<String>();
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/blockprune/BlockPruneQueryTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/blockprune/BlockPruneQueryTestCase.scala
index ba8ec1f..fc6c8bd 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/blockprune/BlockPruneQueryTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/blockprune/BlockPruneQueryTestCase.scala
@@ -18,9 +18,11 @@ package org.apache.carbondata.spark.testsuite.blockprune
import java.io.DataOutputStream
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.spark.sql.Row
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.spark.sql.test.util.QueryTest
/**
@@ -28,6 +30,28 @@ import org.apache.spark.sql.test.util.QueryTest
*/
class BlockPruneQueryTestCase extends QueryTest with BeforeAndAfterAll {
val outputPath = s"$resourcesPath/block_prune_test.csv"
+ val enableMultiThreadFilesCount = "1"
+ val negativeMultiThreadFilesCount = "-1"
+ val disableMultiThreadFilesCount =
+
CarbonCommonConstants.CARBON_DRIVER_PRUNING_MULTI_THREAD_ENABLE_FILES_COUNT_DEFAULT;
+
+ def perpareCarbonProperty(propertyName:String,
+ propertyValue:String): Unit ={
+ val properties = CarbonProperties.getInstance()
+ properties.addProperty(propertyName, propertyValue)
+ assert(properties.getProperty(propertyName).equals(propertyValue))
+ }
+
+ def perpareData(): Unit ={
+ sql(
+ """CREATE TABLE IF NOT EXISTS blockprune (name string, id int)
+ STORED AS carbondata""")
+ sql(
+ s"LOAD DATA LOCAL INPATH '$outputPath' INTO table blockprune
options('FILEHEADER'='name,id')"
+ )
+ }
+
+
override def beforeAll {
// Since the data needed for block prune is big, need to create a temp
data file
val testData: Array[String]= new Array[String](3)
@@ -59,19 +83,64 @@ class BlockPruneQueryTestCase extends QueryTest with
BeforeAndAfterAll {
}
}
}
+ }
+
+ test("test block prune without filter") {
+ sql("DROP TABLE IF EXISTS blockprune")
+ perpareData()
+ checkAnswer(
+ sql(
+ """select * from blockprune limit 1"""),
+ Seq(Row("a", 0)))
+ }
+ test("test block prune with negative multiThreadFilesCount") {
sql("DROP TABLE IF EXISTS blockprune")
+
perpareCarbonProperty(CarbonCommonConstants.CARBON_DRIVER_PRUNING_MULTI_THREAD_ENABLE_FILES_COUNT,
+ negativeMultiThreadFilesCount)
+ perpareData()
+ checkAnswer(
+ sql(
+ """select * from blockprune limit 1"""),
+ Seq(Row("a", 0)))
}
- test("test block prune query") {
+ test("test block prune single thread") {
+ sql("DROP TABLE IF EXISTS blockprune")
+ perpareData()
+ // data is in all 7 blocks
+ checkAnswer(
+ sql(
+ """select name,count(name) as amount from blockprune
+ where name='c' or name='b' or name='a' group by name"""),
+ Seq(Row("a", 240001), Row("b", 240001), Row("c", 240001)))
+
+ // data only in middle 3/4/5 blocks
+ checkAnswer(
+ sql(
+ """select name,count(name) as amount from blockprune
+ where name='b' group by name"""),
+ Seq(Row("b", 240001)))
+ }
+
+ test("test block prune multi threads") {
+ sql("DROP TABLE IF EXISTS blockprune")
+
+
perpareCarbonProperty(CarbonCommonConstants.CARBON_DRIVER_PRUNING_MULTI_THREAD_ENABLE_FILES_COUNT,
+ enableMultiThreadFilesCount)
+
sql(
"""
CREATE TABLE IF NOT EXISTS blockprune (name string, id int)
STORED AS carbondata
""")
- sql(
+
+ val segmentCount = 10
+ for (i <- 1 to segmentCount) {
+ sql(
s"LOAD DATA LOCAL INPATH '$outputPath' INTO table blockprune
options('FILEHEADER'='name,id')"
)
+ }
// data is in all 7 blocks
checkAnswer(
sql(
@@ -79,16 +148,17 @@ class BlockPruneQueryTestCase extends QueryTest with
BeforeAndAfterAll {
select name,count(name) as amount from blockprune
where name='c' or name='b' or name='a' group by name
"""),
- Seq(Row("a", 240001), Row("b", 240001), Row("c", 240001)))
+ Seq(Row("a", 2400010), Row("b", 2400010), Row("c", 2400010)))
// data only in middle 3/4/5 blocks
checkAnswer(
sql(
- """
- select name,count(name) as amount from blockprune
- where name='b' group by name
- """),
- Seq(Row("b", 240001)))
+ """select name,count(name) as amount from blockprune
+ where name='b' group by name"""),
+ Seq(Row("b", 2400010)))
+
+
perpareCarbonProperty(CarbonCommonConstants.CARBON_DRIVER_PRUNING_MULTI_THREAD_ENABLE_FILES_COUNT,
+ disableMultiThreadFilesCount)
}
override def afterAll {
@@ -101,8 +171,9 @@ class BlockPruneQueryTestCase extends QueryTest with
BeforeAndAfterAll {
} catch {
case ex: Exception =>
LOGGER.error("Delete temp test data file for block prune catching
exception", ex)
+ } finally {
+ sql("DROP TABLE IF EXISTS blockprune")
}
- sql("DROP TABLE IF EXISTS blockprune")
}
}
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index f29a94a..e8e4a17 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -22,12 +22,10 @@ import java.io.{ByteArrayInputStream, DataOutputStream,
ObjectInputStream, Objec
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
-
import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
-
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta,
Segment}
import org.apache.carbondata.core.datamap.dev.{DataMapBuilder, DataMapModel,
DataMapWriter}
@@ -44,6 +42,7 @@ import
org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetad
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable,
DataMapSchema, DiskBasedDMSchemaStorageProvider}
import org.apache.carbondata.core.scan.expression.Expression
import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter
import org.apache.carbondata.core.scan.filter.intf.ExpressionType
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties}
@@ -200,7 +199,8 @@ class CGDataMap extends CoarseGrainDataMap {
override def prune(
filterExp: FilterResolverIntf,
segmentProperties: SegmentProperties,
- partitions: java.util.List[PartitionSpec]): java.util.List[Blocklet] = {
+ partitions: java.util.List[PartitionSpec],
+ filterExecuter: FilterExecuter): java.util.List[Blocklet] = {
val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
val expression = filterExp.getFilterExpression
getEqualToExpression(expression, buffer)
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index 4d848a6..b52f7e2 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -20,12 +20,10 @@ import java.io.{ByteArrayInputStream, DataOutputStream,
ObjectInputStream, Objec
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
-
import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
-
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta,
Segment}
import org.apache.carbondata.core.datamap.dev.{DataMapBuilder, DataMapModel,
DataMapWriter}
@@ -42,6 +40,7 @@ import
org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSch
import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.carbondata.core.scan.expression.Expression
import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter
import org.apache.carbondata.core.scan.filter.intf.ExpressionType
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties}
@@ -195,7 +194,8 @@ class FGDataMap extends FineGrainDataMap {
override def prune(
filterExp: FilterResolverIntf,
segmentProperties: SegmentProperties,
- partitions: java.util.List[PartitionSpec]):
java.util.List[FineGrainBlocklet] = {
+ partitions: java.util.List[PartitionSpec],
+ filterExecuter: FilterExecuter): java.util.List[FineGrainBlocklet] = {
val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
val expression = filterExp.getFilterExpression
getEqualToExpression(expression, buffer)