This is an automated email from the ASF dual-hosted git repository.
kumarvishal09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 488a547 [CARBONDATA-3555] Make move filter related methods under
DataMapFilter
488a547 is described below
commit 488a5470d2a172019f8c608bb0fab0b78ed14bdc
Author: kunal642 <[email protected]>
AuthorDate: Wed Oct 23 16:14:38 2019 +0530
[CARBONDATA-3555] Make move filter related methods under DataMapFilter
1. This PR will make DataMapFilter as a filter holder for the filter
expression and the FilterResolver objects so that
2. all the major API's can accept dataMapFilter as an argument.
3. Moved all the filter resolving methods inside DataMapFilter for ease of
use
4. Fixed Datasource issue where invalid datamaps are getting pruned
This closes #3419
---
.../carbondata/core/datamap/DataMapFilter.java | 110 +++++++++++++++++----
.../core/datamap/DataMapStoreManager.java | 15 ++-
.../apache/carbondata/core/datamap/Segment.java | 5 +
.../carbondata/core/datamap/TableDataMap.java | 2 +-
.../indexstore/blockletindex/BlockDataMap.java | 20 +---
.../core/metadata/schema/table/CarbonTable.java | 23 -----
.../core/metadata/schema/table/TableInfo.java | 4 +
.../scan/executor/impl/AbstractQueryExecutor.java | 38 ++++---
.../carbondata/core/scan/model/QueryModel.java | 29 ++----
.../core/scan/model/QueryModelBuilder.java | 30 +++---
dev/findbugs-exclude.xml | 8 ++
.../hadoop/api/CarbonFileInputFormat.java | 12 ++-
.../carbondata/hadoop/api/CarbonInputFormat.java | 36 ++++---
.../hadoop/api/CarbonTableInputFormat.java | 33 ++++---
.../hadoop/stream/StreamRecordReader.java | 4 +-
.../carbondata/hadoop/testutil/StoreCreator.java | 3 +-
.../hadoop/util/CarbonInputFormatUtil.java | 7 +-
.../hadoop/ft/CarbonTableInputFormatTest.java | 12 ++-
.../carbondata/presto/CarbondataPageSource.java | 11 ++-
.../carbondata/presto/impl/CarbonTableReader.java | 9 +-
...ryWithColumnMetCacheAndCacheLevelProperty.scala | 7 +-
.../filterexpr/FilterProcessorTestCase.scala | 7 ++
.../filterexpr/TestImplicitFilterExpression.scala | 5 +-
.../carbondata/spark/rdd/CarbonScanRDD.scala | 38 ++++---
.../command/carbonTableSchemaCommon.scala | 5 +-
.../vectorreader/VectorizedCarbonRecordReader.java | 4 +-
.../execution/datasources/CarbonFileIndex.scala | 6 +-
.../datasources/SparkCarbonFileFormat.scala | 12 ++-
.../apache/carbondata/store/SparkCarbonStore.scala | 4 +-
.../spark/sql/CarbonDatasourceHadoopRelation.scala | 7 +-
.../command/management/CarbonAddLoadCommand.scala | 1 +
.../strategy/CarbonLateDecodeStrategy.scala | 2 +
.../merger/CarbonCompactionExecutor.java | 4 +-
.../carbondata/sdk/file/CarbonReaderBuilder.java | 4 +-
.../carbondata/sdk/file/CarbonSchemaReader.java | 52 ++++------
.../apache/carbondata/store/LocalCarbonStore.java | 4 +-
36 files changed, 322 insertions(+), 251 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java
index 46f37db..23805e2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datamap;
import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
@@ -29,7 +30,11 @@ import
org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
import org.apache.carbondata.core.scan.expression.ColumnExpression;
import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
+import org.apache.carbondata.core.scan.filter.intf.FilterOptimizer;
+import org.apache.carbondata.core.scan.filter.optimizer.RangeFilterOptmizer;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.util.ObjectSerializationUtil;
/**
@@ -37,7 +42,9 @@ import
org.apache.carbondata.core.util.ObjectSerializationUtil;
*/
public class DataMapFilter implements Serializable {
- private CarbonTable table;
+ private static final long serialVersionUID = 6276855832288220240L;
+
+ private transient CarbonTable table;
private Expression expression;
@@ -45,9 +52,16 @@ public class DataMapFilter implements Serializable {
private String serializedExpression;
+ private SegmentProperties properties;
+
public DataMapFilter(CarbonTable table, Expression expression) {
- this.table = table;
+ this(table, expression, false);
+ }
+
+ public DataMapFilter(CarbonTable table, Expression expression, boolean
lazyResolve) {
this.expression = expression;
+ this.table = table;
+ resolve(lazyResolve);
if (expression != null) {
checkIfFilterColumnExistsInTable();
try {
@@ -56,10 +70,19 @@ public class DataMapFilter implements Serializable {
throw new RuntimeException("Error while serializing the exception", e);
}
}
- resolve();
}
- public Expression getNewCopyOfExpression() {
+ public DataMapFilter(FilterResolverIntf resolver) {
+ this.resolver = resolver;
+ }
+
+ public DataMapFilter(SegmentProperties properties, CarbonTable table,
Expression expression) {
+ this(table, expression);
+ this.properties = properties;
+ resolve(false);
+ }
+
+ Expression getNewCopyOfExpression() {
if (expression != null) {
try {
return (Expression) ObjectSerializationUtil
@@ -72,6 +95,10 @@ public class DataMapFilter implements Serializable {
}
}
+ public void setTable(CarbonTable table) {
+ this.table = table;
+ }
+
private Set<String> extractColumnExpressions(Expression expression) {
Set<String> columnExpressionList = new HashSet<>();
for (Expression expressions: expression.getChildren()) {
@@ -111,14 +138,19 @@ public class DataMapFilter implements Serializable {
}
}
- public DataMapFilter(FilterResolverIntf resolver) {
- this.resolver = resolver;
- }
-
- private void resolve() {
+ /**
+ * Process the FilterExpression and create FilterResolverIntf.
+ *
+ * @param lazyResolve whether to create FilterResolverIntf immediately or
not.
+ * Pass true if DataMapFilter object is created before
checking the valid
+ * segments for pruning.
+ */
+ public void resolve(boolean lazyResolve) {
if (expression != null) {
- table.processFilterExpression(expression, null, null);
- resolver = CarbonTable.resolveFilter(expression,
table.getAbsoluteTableIdentifier());
+ processFilterExpression();
+ if (!lazyResolve) {
+ resolver = resolveFilter();
+ }
}
}
@@ -131,13 +163,12 @@ public class DataMapFilter implements Serializable {
}
public FilterResolverIntf getResolver() {
+ if (resolver == null) {
+ resolver = resolveFilter();
+ }
return resolver;
}
- public void setResolver(FilterResolverIntf resolver) {
- this.resolver = resolver;
- }
-
public boolean isEmpty() {
return resolver == null;
}
@@ -149,10 +180,51 @@ public class DataMapFilter implements Serializable {
if (!table.isTransactionalTable()) {
return false;
}
- if (table.hasColumnDrift() && RestructureUtil
- .hasColumnDriftOnSegment(table, segmentProperties)) {
- return false;
+ return !(table.hasColumnDrift() && RestructureUtil
+ .hasColumnDriftOnSegment(table, segmentProperties));
+ }
+
+ public void processFilterExpression() {
+ processFilterExpression(null, null);
+ }
+
+ public void processFilterExpression(boolean[] isFilterDimensions,
+ boolean[] isFilterMeasures) {
+ processFilterExpressionWithoutRange(isFilterDimensions, isFilterMeasures);
+ if (null != expression) {
+ // Optimize Filter Expression and fit RANGE filters is conditions apply.
+ FilterOptimizer rangeFilterOptimizer = new
RangeFilterOptmizer(expression);
+ rangeFilterOptimizer.optimizeFilter();
+ }
+ }
+
+ public void processFilterExpressionWithoutRange(boolean[] isFilterDimensions,
+ boolean[] isFilterMeasures) {
+ QueryModel.FilterProcessVO processVO;
+ if (properties != null) {
+ processVO =
+ new QueryModel.FilterProcessVO(properties.getDimensions(),
properties.getMeasures(),
+ new ArrayList<CarbonDimension>());
+ } else {
+ processVO =
+ new
QueryModel.FilterProcessVO(table.getDimensionByTableName(table.getTableName()),
+ table.getMeasureByTableName(table.getTableName()),
+ table.getImplicitDimensionByTableName(table.getTableName()));
+ }
+ QueryModel.processFilterExpression(processVO, expression,
isFilterDimensions, isFilterMeasures,
+ table);
+ }
+
+ /**
+ * Resolve the filter expression.
+ */
+ private FilterResolverIntf resolveFilter() {
+ try {
+ FilterExpressionProcessor filterExpressionProcessor = new
FilterExpressionProcessor();
+ return filterExpressionProcessor
+ .getFilterResolver(expression, table.getAbsoluteTableIdentifier());
+ } catch (Exception e) {
+ throw new RuntimeException("Error while resolving filter expression", e);
}
- return true;
}
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 5d247f9..0f0a62f 100644
---
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -37,7 +37,6 @@ import
org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
import
org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.CarbonMetadata;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
import
org.apache.carbondata.core.metadata.schema.table.DataMapSchemaStorageProvider;
@@ -343,7 +342,7 @@ public final class DataMapStoreManager {
if (allDataMaps.size() > 0 &&
!CollectionUtils.isEmpty(allDataMaps.get(tableId))
&&
!allDataMaps.get(tableId).get(0).getTable().getTableInfo().getFactTable()
.getListOfColumns().equals(table.getTableInfo().getFactTable().getListOfColumns()))
{
- clearDataMaps(table.getCarbonTableIdentifier());
+ clearDataMaps(tableId);
tableIndices = null;
}
TableDataMap dataMap = null;
@@ -555,7 +554,7 @@ public final class DataMapStoreManager {
}
}
segmentRefreshMap.remove(tableId);
- clearDataMaps(identifier.getCarbonTableIdentifier());
+ clearDataMaps(tableId);
allDataMaps.remove(tableId);
tablePathMap.remove(tableId);
}
@@ -585,8 +584,8 @@ public final class DataMapStoreManager {
/**
* this methods clears the datamap of table from memory
*/
- public void clearDataMaps(CarbonTableIdentifier carbonTableIdentifier) {
- List<TableDataMap> tableIndices =
allDataMaps.get(carbonTableIdentifier.getTableId());
+ public void clearDataMaps(String tableId) {
+ List<TableDataMap> tableIndices = allDataMaps.get(tableId);
if (tableIndices != null) {
for (TableDataMap tableDataMap : tableIndices) {
if (tableDataMap != null) {
@@ -597,8 +596,8 @@ public final class DataMapStoreManager {
}
}
}
- allDataMaps.remove(carbonTableIdentifier.getTableId());
- tablePathMap.remove(carbonTableIdentifier.getTableId());
+ allDataMaps.remove(tableId);
+ tablePathMap.remove(tableId);
}
/**
@@ -783,7 +782,7 @@ public final class DataMapStoreManager {
}
allDataMaps.put(carbonTable.getTableId(), remainingDataMaps);
} else {
- clearDataMaps(carbonTable.getCarbonTableIdentifier());
+ clearDataMaps(carbonTable.getTableId());
// clear the segment properties cache from executor
SegmentPropertiesAndSchemaHolder.getInstance()
.invalidate(carbonTable.getAbsoluteTableIdentifier());
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index 5279094..38d6e9d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -316,6 +316,11 @@ public class Segment implements Serializable, Writable {
}
public String getSegmentPath() {
+ if (segmentPath == null) {
+ if (loadMetadataDetails != null) {
+ segmentPath = loadMetadataDetails.getPath();
+ }
+ }
return segmentPath;
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 804e895..d63a39c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -132,7 +132,7 @@ public final class TableDataMap extends
OperationEventListener {
// As 0.1 million files block pruning can take only 1 second.
// Doing multi-thread for smaller values is not recommended as
// driver should have minimum threads opened to support multiple
concurrent queries.
- if (filter.isEmpty()) {
+ if (filter == null || filter.isEmpty()) {
// if filter is not passed, then return all the blocklets.
return pruneWithoutFilter(segments, partitions, blocklets);
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index 90aed35..7d358eb 100644
---
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -29,6 +29,7 @@ import java.util.Map;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.dev.DataMapModel;
import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
@@ -53,7 +54,6 @@ import
org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.profiler.ExplainCollector;
import org.apache.carbondata.core.scan.expression.Expression;
@@ -61,10 +61,7 @@ import
org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
import org.apache.carbondata.core.scan.filter.FilterUtil;
import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
import
org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
-import org.apache.carbondata.core.scan.filter.intf.FilterOptimizer;
-import org.apache.carbondata.core.scan.filter.optimizer.RangeFilterOptmizer;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.util.BlockletDataMapUtil;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.DataFileFooterConverter;
@@ -777,19 +774,8 @@ public class BlockDataMap extends CoarseGrainDataMap
@Override
public List<Blocklet> prune(Expression expression, SegmentProperties
properties,
List<PartitionSpec> partitions, CarbonTable carbonTable) throws
IOException {
- FilterResolverIntf filterResolverIntf = null;
- if (expression != null) {
- QueryModel.FilterProcessVO processVO =
- new QueryModel.FilterProcessVO(properties.getDimensions(),
properties.getMeasures(),
- new ArrayList<CarbonDimension>());
- QueryModel.processFilterExpression(processVO, expression, null, null,
carbonTable);
- // Optimize Filter Expression and fit RANGE filters is conditions apply.
- FilterOptimizer rangeFilterOptimizer = new
RangeFilterOptmizer(expression);
- rangeFilterOptimizer.optimizeFilter();
- filterResolverIntf =
- CarbonTable.resolveFilter(expression,
carbonTable.getAbsoluteTableIdentifier());
- }
- return prune(filterResolverIntf, properties, partitions);
+ return prune(new DataMapFilter(properties, carbonTable,
expression).getResolver(), properties,
+ partitions);
}
@Override
diff --git
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 1ca2b89..2c13ec1 100644
---
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -55,10 +55,7 @@ import
org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
-import org.apache.carbondata.core.scan.filter.intf.FilterOptimizer;
-import org.apache.carbondata.core.scan.filter.optimizer.RangeFilterOptmizer;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
@@ -1119,26 +1116,6 @@ public class CarbonTable implements Serializable,
Writable {
return dataSize + indexSize;
}
- public void processFilterExpression(Expression filterExpression, boolean[]
isFilterDimensions,
- boolean[] isFilterMeasures) {
- processFilterExpressionWithoutRange(filterExpression, isFilterDimensions,
isFilterMeasures);
- if (null != filterExpression) {
- // Optimize Filter Expression and fit RANGE filters is conditions apply.
- FilterOptimizer rangeFilterOptimizer = new
RangeFilterOptmizer(filterExpression);
- rangeFilterOptimizer.optimizeFilter();
- }
- }
-
- public void processFilterExpressionWithoutRange(Expression filterExpression,
- boolean[] isFilterDimensions, boolean[] isFilterMeasures) {
- QueryModel.FilterProcessVO processVO =
- new QueryModel.FilterProcessVO(getDimensionByTableName(getTableName()),
- getMeasureByTableName(getTableName()),
getImplicitDimensionByTableName(getTableName()));
- QueryModel
- .processFilterExpression(processVO, filterExpression,
isFilterDimensions, isFilterMeasures,
- this);
- }
-
public boolean isTransactionalTable() {
return isTransactionalTable;
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
index 4aeba9d..030f838 100644
---
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
+++
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
@@ -397,4 +397,8 @@ public class TableInfo implements Serializable, Writable {
public boolean hasColumnDrift() {
return hasColumnDrift;
}
+
+ public String getTablePath() {
+ return tablePath;
+ }
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 36a4727..a174a91 100644
---
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.IndexKey;
import org.apache.carbondata.core.datastore.ReusableDataBuffer;
@@ -56,10 +57,8 @@ import
org.apache.carbondata.core.scan.executor.exception.QueryExecutionExceptio
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
import org.apache.carbondata.core.scan.executor.util.QueryUtil;
import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
-import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.FilterUtil;
-import org.apache.carbondata.core.scan.filter.intf.FilterOptimizer;
-import org.apache.carbondata.core.scan.filter.optimizer.RangeFilterOptmizer;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.scan.model.ProjectionDimension;
import org.apache.carbondata.core.scan.model.ProjectionMeasure;
import org.apache.carbondata.core.scan.model.QueryModel;
@@ -147,9 +146,10 @@ public abstract class AbstractQueryExecutor<E> implements
QueryExecutor<E> {
// and measure column start index
queryProperties.filterMeasures = new HashSet<>();
queryProperties.complexFilterDimension = new HashSet<>();
-
QueryUtil.getAllFilterDimensionsAndMeasures(queryModel.getFilterExpressionResolverTree(),
- queryProperties.complexFilterDimension,
queryProperties.filterMeasures);
-
+ if (queryModel.getDataMapFilter() != null) {
+
QueryUtil.getAllFilterDimensionsAndMeasures(queryModel.getDataMapFilter().getResolver(),
+ queryProperties.complexFilterDimension,
queryProperties.filterMeasures);
+ }
CarbonTable carbonTable = queryModel.getTable();
queryStatistic = new QueryStatistic();
@@ -329,18 +329,12 @@ public abstract class AbstractQueryExecutor<E> implements
QueryExecutor<E> {
}
private void createFilterExpression(QueryModel queryModel, SegmentProperties
properties) {
- Expression expression = queryModel.getFilterExpression();
- if (expression != null) {
- QueryModel.FilterProcessVO processVO = new QueryModel.FilterProcessVO(
- properties.getDimensions(),
- properties.getMeasures(),
- new ArrayList<CarbonDimension>());
- QueryModel.processFilterExpression(processVO, expression, null, null,
queryModel.getTable());
- // Optimize Filter Expression and fit RANGE filters is conditions apply.
- FilterOptimizer rangeFilterOptimizer = new
RangeFilterOptmizer(expression);
- rangeFilterOptimizer.optimizeFilter();
- queryModel.setFilterExpressionResolverTree(
- CarbonTable.resolveFilter(expression,
queryModel.getAbsoluteTableIdentifier()));
+ if (queryModel.getDataMapFilter() != null) {
+ if (!queryModel.getDataMapFilter().isResolvedOnSegment(properties)) {
+ DataMapFilter expression = new DataMapFilter(properties,
queryModel.getTable(),
+ queryModel.getDataMapFilter().getExpression());
+ queryModel.setDataMapFilter(expression);
+ }
}
}
@@ -520,10 +514,12 @@ public abstract class AbstractQueryExecutor<E> implements
QueryExecutor<E> {
queryProperties.columnToDictionaryMapping,
queryProperties.complexFilterDimension));
IndexKey startIndexKey = null;
IndexKey endIndexKey = null;
- if (null != queryModel.getFilterExpressionResolverTree()) {
+ if (null != queryModel.getDataMapFilter()) {
+ FilterResolverIntf filterResolverIntf;
// loading the filter executor tree for filter evaluation
- blockExecutionInfo.setFilterExecuterTree(FilterUtil
- .getFilterExecuterTree(queryModel.getFilterExpressionResolverTree(),
segmentProperties,
+ filterResolverIntf = queryModel.getDataMapFilter().getResolver();
+ blockExecutionInfo.setFilterExecuterTree(
+ FilterUtil.getFilterExecuterTree(filterResolverIntf,
segmentProperties,
blockExecutionInfo.getComlexDimensionInfoMap()));
}
try {
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
index 4d10492..1a94608 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.datamap.DataMapFilter;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -32,7 +33,6 @@ import
org.apache.carbondata.core.scan.expression.ColumnExpression;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.expression.UnknownExpression;
import
org.apache.carbondata.core.scan.expression.conditional.ConditionalExpression;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
@@ -58,15 +58,11 @@ public class QueryModel {
* query id
*/
private String queryId;
- /**
- * filter tree
- */
- private FilterResolverIntf filterExpressionResolverTree;
/**
* filter expression tree
*/
- private Expression filterExpression;
+ private DataMapFilter dataMapFilter;
/**
* table block information in which query will be executed
@@ -276,23 +272,12 @@ public class QueryModel {
this.tableBlockInfos = tableBlockInfos;
}
- /**
- * @return the filterEvaluatorTree
- */
- public FilterResolverIntf getFilterExpressionResolverTree() {
- return filterExpressionResolverTree;
- }
-
- public void setFilterExpressionResolverTree(FilterResolverIntf
filterExpressionResolverTree) {
- this.filterExpressionResolverTree = filterExpressionResolverTree;
- }
-
- public Expression getFilterExpression() {
- return filterExpression;
+ public DataMapFilter getDataMapFilter() {
+ return dataMapFilter;
}
- public void setFilterExpression(Expression filterExpression) {
- this.filterExpression = filterExpression;
+ public void setDataMapFilter(DataMapFilter dataMapFilter) {
+ this.dataMapFilter = dataMapFilter;
}
/**
@@ -416,7 +401,7 @@ public class QueryModel {
return String.format("scan on table %s.%s, %d projection columns with
filter (%s)",
table.getDatabaseName(), table.getTableName(),
projection.getDimensions().size() + projection.getMeasures().size(),
- filterExpressionResolverTree.getFilterExpression().toString());
+ dataMapFilter.getExpression().toString());
}
public boolean isFreeUnsafeMemory() {
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
index e91d14d..61c9b5a 100644
---
a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
+++
b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
@@ -25,12 +25,11 @@ import java.util.Map;
import java.util.Objects;
import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.DataMapFilter;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.util.DataTypeConverter;
import org.apache.log4j.Logger;
@@ -39,7 +38,7 @@ public class QueryModelBuilder {
private CarbonTable table;
private QueryProjection projection;
- private Expression filterExpression;
+ private DataMapFilter dataMapFilter;
private DataTypeConverter dataTypeConverter;
private boolean forcedDetailRawQuery;
private boolean readPageByPage;
@@ -287,8 +286,8 @@ public class QueryModelBuilder {
return this;
}
- public QueryModelBuilder filterExpression(Expression filterExpression) {
- this.filterExpression = filterExpression;
+ public QueryModelBuilder filterExpression(DataMapFilter filterExpression) {
+ this.dataMapFilter = filterExpression;
return this;
}
@@ -326,21 +325,18 @@ public class QueryModelBuilder {
// set the filter to the query model in order to filter blocklet before
scan
boolean[] isFilterDimensions = new
boolean[table.getDimensionOrdinalMax()];
boolean[] isFilterMeasures = new boolean[table.getAllMeasures().size()];
- // In case of Dictionary Include Range Column we donot optimize the
range expression
- if (isConvertToRangeFilter()) {
- table.processFilterExpression(filterExpression, isFilterDimensions,
isFilterMeasures);
- } else {
- table.processFilterExpressionWithoutRange(filterExpression,
isFilterDimensions,
- isFilterMeasures);
- }
queryModel.setIsFilterDimensions(isFilterDimensions);
queryModel.setIsFilterMeasures(isFilterMeasures);
- FilterResolverIntf filterIntf =
- CarbonTable.resolveFilter(filterExpression,
table.getAbsoluteTableIdentifier());
- queryModel.setFilterExpressionResolverTree(filterIntf);
- } else {
- queryModel.setFilterExpression(filterExpression);
+ // In case of Dictionary Include Range Column we donot optimize the
range expression
+ if (dataMapFilter != null) {
+ if (isConvertToRangeFilter()) {
+ dataMapFilter.processFilterExpression(isFilterDimensions,
isFilterMeasures);
+ } else {
+
dataMapFilter.processFilterExpressionWithoutRange(isFilterDimensions,
isFilterMeasures);
+ }
+ }
}
+ queryModel.setDataMapFilter(dataMapFilter);
return queryModel;
}
}
diff --git a/dev/findbugs-exclude.xml b/dev/findbugs-exclude.xml
index cac0eb4..6690863 100644
--- a/dev/findbugs-exclude.xml
+++ b/dev/findbugs-exclude.xml
@@ -110,4 +110,12 @@
<Class name="org.apache.carbondata.core.datamap.Segment"/>
<Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/>
</Match>
+ <Match>
+ <Class name="org.apache.carbondata.core.datamap.DataMapFilter"/>
+ <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/>
+ </Match>
+ <Match>
+ <Class name="org.apache.carbondata.core.datamap.DataMapFilter"/>
+ <Bug pattern="SE_BAD_FIELD"/>
+ </Match>
</FindBugsFilter>
\ No newline at end of file
diff --git
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index 26d3231..8eafd11 100644
---
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -28,6 +28,7 @@ import java.util.List;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.datamap.DataMapFilter;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
@@ -41,7 +42,6 @@ import
org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
-import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.statusmanager.FileFormat;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -133,8 +133,7 @@ public class CarbonFileInputFormat<T> extends
CarbonInputFormat<T> implements Se
}
// this will be null in case of corrupt schema file.
PartitionInfo partitionInfo =
carbonTable.getPartitionInfo(carbonTable.getTableName());
- Expression filter = getFilterPredicates(job.getConfiguration());
-
+ DataMapFilter filter = getFilterPredicates(job.getConfiguration());
// if external table Segments are found, add it to the List
List<Segment> externalTableSegments = new ArrayList<Segment>();
@@ -168,6 +167,9 @@ public class CarbonFileInputFormat<T> extends
CarbonInputFormat<T> implements Se
// useBlockDataMap would be false in case of SDK when user has not
provided any filter, In
// this case we don't want to load block/blocklet datamap. It would be
true in all other
// scenarios
+ if (filter != null) {
+ filter.resolve(false);
+ }
if (useBlockDataMap) {
// do block filtering and get split
splits = getSplits(job, filter, externalTableSegments, null,
partitionInfo, null);
@@ -251,7 +253,7 @@ public class CarbonFileInputFormat<T> extends
CarbonInputFormat<T> implements Se
* @return
* @throws IOException
*/
- private List<InputSplit> getSplits(JobContext job, Expression expression,
+ private List<InputSplit> getSplits(JobContext job, DataMapFilter
dataMapFilter,
List<Segment> validSegments, BitSet matchedPartitions, PartitionInfo
partitionInfo,
List<Integer> oldPartitionIdList) throws IOException {
@@ -260,7 +262,7 @@ public class CarbonFileInputFormat<T> extends
CarbonInputFormat<T> implements Se
// for each segment fetch blocks matching filter in Driver BTree
List<CarbonInputSplit> dataBlocksOfSegment =
- getDataBlocksOfSegment(job, carbonTable, expression,
matchedPartitions, validSegments,
+ getDataBlocksOfSegment(job, carbonTable, dataMapFilter,
matchedPartitions, validSegments,
partitionInfo, oldPartitionIdList, new ArrayList<Segment>(), new
ArrayList<String>());
numBlocks = dataBlocksOfSegment.size();
result.addAll(dataBlocksOfSegment);
diff --git
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 8842aa4..30a3202 100644
---
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -230,8 +230,9 @@ public abstract class CarbonInputFormat<T> extends
FileInputFormat<Void, T> {
* @para DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
m filterExpression
*/
- public static void setFilterPredicates(Configuration configuration,
Expression filterExpression) {
- if (filterExpression == null) {
+ public static void setFilterPredicates(Configuration configuration,
+ DataMapFilter filterExpression) {
+ if (filterExpression == null || filterExpression.getExpression() == null) {
return;
}
try {
@@ -472,14 +473,19 @@ m filterExpression
}
}
- protected Expression getFilterPredicates(Configuration configuration) {
+ protected DataMapFilter getFilterPredicates(Configuration configuration) {
try {
String filterExprString = configuration.get(FILTER_PREDICATE);
if (filterExprString == null) {
return null;
}
- Object filter =
ObjectSerializationUtil.convertStringToObject(filterExprString);
- return (Expression) filter;
+ DataMapFilter filter =
+ (DataMapFilter)
ObjectSerializationUtil.convertStringToObject(filterExprString);
+ if (filter != null) {
+ CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
+ filter.setTable(carbonTable);
+ }
+ return filter;
} catch (IOException e) {
throw new RuntimeException("Error while reading filter expression", e);
}
@@ -489,7 +495,7 @@ m filterExpression
* get data blocks of given segment
*/
protected List<CarbonInputSplit> getDataBlocksOfSegment(JobContext job,
CarbonTable carbonTable,
- Expression expression, BitSet matchedPartitions, List<Segment>
segmentIds,
+ DataMapFilter expression, BitSet matchedPartitions, List<Segment>
segmentIds,
PartitionInfo partitionInfo, List<Integer> oldPartitionIdList,
List<Segment> invalidSegments, List<String> segmentsToBeRefreshed)
throws IOException {
@@ -557,11 +563,12 @@ m filterExpression
* First pruned with default blocklet datamap, then pruned with CG and FG
datamaps
*/
private List<ExtendedBlocklet> getPrunedBlocklets(JobContext job,
CarbonTable carbonTable,
- Expression expression, List<Segment> segmentIds, List<Segment>
invalidSegments,
+ DataMapFilter filter, List<Segment> segmentIds, List<Segment>
invalidSegments,
List<String> segmentsToBeRefreshed) throws IOException {
ExplainCollector.addPruningInfo(carbonTable.getTableName());
- final DataMapFilter filter = new DataMapFilter(carbonTable, expression);
- ExplainCollector.setFilterStatement(expression == null ? "none" :
expression.getStatement());
+ filter = filter == null ? new DataMapFilter(carbonTable, null) : filter;
+ ExplainCollector.setFilterStatement(
+ filter.getExpression() == null ? "none" :
filter.getExpression().getStatement());
boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT));
@@ -712,7 +719,7 @@ m filterExpression
}
public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext
taskAttemptContext,
- Expression filterExpression) throws IOException {
+ DataMapFilter dataMapFilter) throws IOException {
Configuration configuration = taskAttemptContext.getConfiguration();
CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
@@ -724,13 +731,14 @@ m filterExpression
} else {
projectColumns = new String[]{};
}
- checkAndAddImplicitExpression(filterExpression, inputSplit);
- QueryModel queryModel = new QueryModelBuilder(carbonTable)
+ if (dataMapFilter != null) {
+ checkAndAddImplicitExpression(dataMapFilter.getExpression(), inputSplit);
+ }
+ return new QueryModelBuilder(carbonTable)
.projectColumns(projectColumns)
- .filterExpression(filterExpression)
+ .filterExpression(dataMapFilter)
.dataConverter(getDataTypeConverter(configuration))
.build();
- return queryModel;
}
/**
diff --git
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 74a4d6e..f4a6899 100644
---
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -29,6 +29,7 @@ import java.util.Map;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
+import org.apache.carbondata.core.datamap.DataMapFilter;
import org.apache.carbondata.core.datamap.DataMapStoreManager;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.TableDataMap;
@@ -47,7 +48,6 @@ import org.apache.carbondata.core.profiler.ExplainCollector;
import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
-import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.statusmanager.FileFormat;
@@ -174,15 +174,17 @@ public class CarbonTableInputFormat<T> extends
CarbonInputFormat<T> {
readCommittedScope);
// process and resolve the expression
- Expression filter = getFilterPredicates(job.getConfiguration());
+ DataMapFilter dataMapFilter = getFilterPredicates(job.getConfiguration());
// this will be null in case of corrupt schema file.
PartitionInfo partitionInfo =
carbonTable.getPartitionInfo(carbonTable.getTableName());
+ if (dataMapFilter != null) {
+ dataMapFilter.resolve(false);
+ }
// prune partitions for filter query on partition table
BitSet matchedPartitions = null;
if (partitionInfo != null && partitionInfo.getPartitionType() !=
PartitionType.NATIVE_HIVE) {
- carbonTable.processFilterExpression(filter, null, null);
- matchedPartitions = setMatchedPartitions(null, filter, partitionInfo,
null);
+ matchedPartitions = setMatchedPartitions(null, dataMapFilter,
partitionInfo, null);
if (matchedPartitions != null) {
if (matchedPartitions.cardinality() == 0) {
return new ArrayList<InputSplit>();
@@ -194,7 +196,7 @@ public class CarbonTableInputFormat<T> extends
CarbonInputFormat<T> {
// do block filtering and get split
List<InputSplit> splits =
- getSplits(job, filter, filteredSegmentToAccess, matchedPartitions,
partitionInfo,
+ getSplits(job, dataMapFilter, filteredSegmentToAccess,
matchedPartitions, partitionInfo,
null, updateStatusManager, segments.getInvalidSegments());
// add all splits of streaming
List<InputSplit> splitsOfStreaming = getSplitsOfStreaming(job,
streamSegments, carbonTable);
@@ -290,11 +292,10 @@ public class CarbonTableInputFormat<T> extends
CarbonInputFormat<T> {
long maxSize = getMaxSplitSize(job);
if (filterResolverIntf == null) {
if (carbonTable != null) {
- Expression filter = getFilterPredicates(job.getConfiguration());
+ DataMapFilter filter = getFilterPredicates(job.getConfiguration());
if (filter != null) {
- carbonTable.processFilterExpression(filter, null, null);
- filterResolverIntf =
- CarbonTable.resolveFilter(filter,
carbonTable.getAbsoluteTableIdentifier());
+ filter.processFilterExpression();
+ filterResolverIntf = filter.getResolver();
}
}
}
@@ -364,13 +365,15 @@ public class CarbonTableInputFormat<T> extends
CarbonInputFormat<T> {
setSegmentsToAccess(job.getConfiguration(), segmentList);
// process and resolve the expression
- Expression filter = getFilterPredicates(job.getConfiguration());
+ DataMapFilter filter = getFilterPredicates(job.getConfiguration());
CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
// this will be null in case of corrupt schema file.
if (null == carbonTable) {
throw new IOException("Missing/Corrupt schema file for table.");
}
- carbonTable.processFilterExpression(filter, null, null);
+ if (filter != null) {
+ filter.processFilterExpression();
+ }
// prune partitions for filter query on partition table
String partitionIds = job.getConfiguration().get(ALTER_PARTITION_ID);
// matchedPartitions records partitionIndex, not partitionId
@@ -405,7 +408,7 @@ public class CarbonTableInputFormat<T> extends
CarbonInputFormat<T> {
* @param oldPartitionIdList only used in alter table command
* @return
*/
- private BitSet setMatchedPartitions(String partitionIds, Expression filter,
+ private BitSet setMatchedPartitions(String partitionIds, DataMapFilter
filter,
PartitionInfo partitionInfo, List<Integer> oldPartitionIdList) {
BitSet matchedPartitions = null;
if (null != partitionIds) {
@@ -418,8 +421,8 @@ public class CarbonTableInputFormat<T> extends
CarbonInputFormat<T> {
}
} else {
if (null != filter) {
- matchedPartitions =
- new FilterExpressionProcessor().getFilteredPartitions(filter,
partitionInfo);
+ matchedPartitions = new FilterExpressionProcessor()
+ .getFilteredPartitions(filter.getExpression(), partitionInfo);
}
}
return matchedPartitions;
@@ -432,7 +435,7 @@ public class CarbonTableInputFormat<T> extends
CarbonInputFormat<T> {
* @return
* @throws IOException
*/
- private List<InputSplit> getSplits(JobContext job, Expression expression,
+ private List<InputSplit> getSplits(JobContext job, DataMapFilter expression,
List<Segment> validSegments, BitSet matchedPartitions, PartitionInfo
partitionInfo,
List<Integer> oldPartitionIdList, SegmentUpdateStatusManager
updateStatusManager,
List<Segment> invalidSegments) throws IOException {
diff --git
a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
index dca5ee4..c272f42 100644
---
a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
+++
b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
@@ -213,7 +213,7 @@ public class StreamRecordReader extends RecordReader<Void,
Object> {
}
// initialize filter
- if (null != model.getFilterExpressionResolverTree()) {
+ if (null != model.getDataMapFilter()) {
initializeFilter();
} else if (projection.length == 0) {
skipScanData = true;
@@ -237,7 +237,7 @@ public class StreamRecordReader extends RecordReader<Void,
Object> {
new SegmentProperties(wrapperColumnSchemaList,
dictionaryColumnCardinality);
Map<Integer, GenericQueryType> complexDimensionInfoMap = new HashMap<>();
- FilterResolverIntf resolverIntf = model.getFilterExpressionResolverTree();
+ FilterResolverIntf resolverIntf = model.getDataMapFilter().getResolver();
filter =
FilterUtil.getFilterExecuterTree(resolverIntf, segmentProperties,
complexDimensionInfoMap);
// for row filter, we need update column index
diff --git
a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
index 5c6653b..bc54a23 100644
---
a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
+++
b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
@@ -181,9 +181,10 @@ public class StoreCreator {
/**
* Create store without any restructure
*/
- public void createCarbonStore() throws Exception {
+ public CarbonLoadModel createCarbonStore() throws Exception {
CarbonLoadModel loadModel = createTableAndLoadModel();
loadData(loadModel, storePath);
+ return loadModel;
}
/**
diff --git
a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index 4d3f73a..872ebb5 100644
---
a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -25,6 +25,7 @@ import java.util.Locale;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
+import org.apache.carbondata.core.datamap.DataMapFilter;
import org.apache.carbondata.core.datamap.DataMapJob;
import org.apache.carbondata.core.datamap.DataMapUtil;
import org.apache.carbondata.core.exception.InvalidConfigurationException;
@@ -102,13 +103,13 @@ public class CarbonInputFormatUtil {
.setTransactionalTable(conf,
carbonTable.getTableInfo().isTransactionalTable());
CarbonProjection columnProjection = new
CarbonProjection(projectionColumns);
return createInputFormat(conf, carbonTable.getAbsoluteTableIdentifier(),
- filterExpression, columnProjection, dataMapJob);
+ new DataMapFilter(carbonTable, filterExpression, true),
columnProjection, dataMapJob);
}
private static <V> CarbonTableInputFormat<V> createInputFormat(
Configuration conf,
AbsoluteTableIdentifier identifier,
- Expression filterExpression,
+ DataMapFilter dataMapFilter,
CarbonProjection columnProjection,
DataMapJob dataMapJob) throws InvalidConfigurationException, IOException
{
CarbonTableInputFormat<V> format = new CarbonTableInputFormat<>();
@@ -116,7 +117,7 @@ public class CarbonInputFormatUtil {
conf,
identifier.appendWithLocalPrefix(identifier.getTablePath()));
CarbonInputFormat.setQuerySegment(conf, identifier);
- CarbonInputFormat.setFilterPredicates(conf, filterExpression);
+ CarbonInputFormat.setFilterPredicates(conf, dataMapFilter);
CarbonInputFormat.setColumnProjection(conf, columnProjection);
if (dataMapJob != null) {
DataMapUtil.setDataMapJob(conf, dataMapJob);
diff --git
a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
index 69a3092..bdcdcf7 100644
---
a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
+++
b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.UUID;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.scan.expression.ColumnExpression;
@@ -40,6 +41,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.hadoop.CarbonProjection;
import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
import org.apache.carbondata.hadoop.testutil.StoreCreator;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -57,6 +59,8 @@ import org.junit.Test;
public class CarbonTableInputFormatTest {
// changed setUp to static init block to avoid un wanted multiple time store
creation
private static StoreCreator creator;
+
+ private static CarbonLoadModel loadModel;
static {
CarbonProperties.getInstance().
addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
"/tmp/carbon/badrecords");
@@ -67,7 +71,7 @@ public class CarbonTableInputFormatTest {
try {
creator = new StoreCreator(new File("target/store").getAbsolutePath(),
new
File("../hadoop/src/test/resources/data.csv").getCanonicalPath());
- creator.createCarbonStore();
+ loadModel = creator.createCarbonStore();
} catch (Exception e) {
Assert.fail("create table failed: " + e.getMessage());
}
@@ -84,7 +88,8 @@ public class CarbonTableInputFormatTest {
CarbonTableInputFormat.setTableName(job.getConfiguration(),
creator.getAbsoluteTableIdentifier().getTableName());
Expression expression = new EqualToExpression(new
ColumnExpression("country", DataTypes.STRING),
new LiteralExpression("china", DataTypes.STRING));
- CarbonTableInputFormat.setFilterPredicates(job.getConfiguration(),
expression);
+ CarbonTableInputFormat.setFilterPredicates(job.getConfiguration(),
+ new
DataMapFilter(loadModel.getCarbonDataLoadSchema().getCarbonTable(),
expression));
List splits = carbonInputFormat.getSplits(job);
Assert.assertTrue(splits != null);
@@ -256,7 +261,8 @@ public class CarbonTableInputFormatTest {
CarbonTableInputFormat.setColumnProjection(job.getConfiguration(),
projection);
}
if (filter != null) {
- CarbonTableInputFormat.setFilterPredicates(job.getConfiguration(),
filter);
+ CarbonTableInputFormat.setFilterPredicates(job.getConfiguration(),
+ new
DataMapFilter(loadModel.getCarbonDataLoadSchema().getCarbonTable(), filter));
}
CarbonTableInputFormat.setDatabaseName(job.getConfiguration(),
abs.getCarbonTableIdentifier().getDatabaseName());
diff --git
a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
index b099167..c62c131 100644
---
a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
+++
b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
@@ -26,6 +26,7 @@ import java.util.Objects;
import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.datamap.DataMapFilter;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
@@ -38,7 +39,6 @@ import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.scan.executor.QueryExecutor;
import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
-import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.model.ProjectionDimension;
import org.apache.carbondata.core.scan.model.ProjectionMeasure;
import org.apache.carbondata.core.scan.model.QueryModel;
@@ -392,7 +392,8 @@ class CarbondataPageSource implements ConnectorPageSource {
conf.set("query.id", queryId);
JobConf jobConf = new JobConf(conf);
CarbonTableInputFormat carbonTableInputFormat =
createInputFormat(jobConf, carbonTable,
-
PrestoFilterUtil.parseFilterExpression(carbondataSplit.getEffectivePredicate()),
+ new DataMapFilter(carbonTable,
+
PrestoFilterUtil.parseFilterExpression(carbondataSplit.getEffectivePredicate())),
carbonProjection);
TaskAttemptContextImpl hadoopAttemptContext =
new TaskAttemptContextImpl(jobConf, new TaskAttemptID("", 1,
TaskType.MAP, 0, 0));
@@ -417,12 +418,12 @@ class CarbondataPageSource implements ConnectorPageSource
{
/**
* @param conf
* @param carbonTable
- * @param filterExpression
+ * @param dataMapFilter
* @param projection
* @return
*/
private CarbonTableInputFormat<Object> createInputFormat(Configuration conf,
- CarbonTable carbonTable, Expression filterExpression, CarbonProjection
projection) {
+ CarbonTable carbonTable, DataMapFilter dataMapFilter, CarbonProjection
projection) {
AbsoluteTableIdentifier identifier =
carbonTable.getAbsoluteTableIdentifier();
CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
@@ -436,7 +437,7 @@ class CarbondataPageSource implements ConnectorPageSource {
} catch (Exception e) {
throw new RuntimeException("Unable to create the
CarbonTableInputFormat", e);
}
- CarbonTableInputFormat.setFilterPredicates(conf, filterExpression);
+ CarbonTableInputFormat.setFilterPredicates(conf, dataMapFilter);
CarbonTableInputFormat.setColumnProjection(conf, projection);
return format;
diff --git
a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index d0f82f1..18c959e 100755
---
a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++
b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
import org.apache.carbondata.core.datamap.DataMapStoreManager;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -273,8 +274,8 @@ public class CarbonTableReader {
try {
CarbonTableInputFormat.setTableInfo(config, tableInfo);
CarbonTableInputFormat carbonTableInputFormat =
- createInputFormat(jobConf, carbonTable.getAbsoluteTableIdentifier(),
filters,
- filteredPartitions);
+ createInputFormat(jobConf, carbonTable.getAbsoluteTableIdentifier(),
+ new DataMapFilter(carbonTable, filters, true),
filteredPartitions);
Job job = Job.getInstance(jobConf);
List<InputSplit> splits = carbonTableInputFormat.getSplits(job);
Gson gson = new Gson();
@@ -356,12 +357,12 @@ public class CarbonTableReader {
}
private CarbonTableInputFormat<Object> createInputFormat(Configuration conf,
- AbsoluteTableIdentifier identifier, Expression filterExpression,
+ AbsoluteTableIdentifier identifier, DataMapFilter dataMapFilter,
List<PartitionSpec> filteredPartitions) throws IOException {
CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
CarbonTableInputFormat
.setTablePath(conf,
identifier.appendWithLocalPrefix(identifier.getTablePath()));
- CarbonTableInputFormat.setFilterPredicates(conf, filterExpression);
+ CarbonTableInputFormat.setFilterPredicates(conf, dataMapFilter);
if (filteredPartitions.size() != 0) {
CarbonTableInputFormat.setPartitionsToPrune(conf, filteredPartitions);
}
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
index 5c34fe4..59a5a5e 100644
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
@@ -27,13 +27,11 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.dev.DataMap
-import org.apache.carbondata.core.datamap.{DataMapChooser,
DataMapStoreManager, Segment, TableDataMap}
-import
org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder
+import org.apache.carbondata.core.datamap.{DataMapChooser, DataMapFilter,
DataMapStoreManager, Segment, TableDataMap}
import org.apache.carbondata.core.indexstore.Blocklet
import org.apache.carbondata.core.indexstore.blockletindex.{BlockDataMap,
BlockletDataMap, BlockletDataMapRowIndexes}
import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema
import org.apache.carbondata.core.metadata.datatype.DataTypes
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
import
org.apache.carbondata.core.scan.expression.conditional.NotEqualsExpression
@@ -282,8 +280,7 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty
extends QueryTest with Be
val notEqualsExpression = new NotEqualsExpression(columnExpression,
literalNullExpression)
val equalsExpression = new NotEqualsExpression(columnExpression,
literalValueExpression)
val andExpression = new AndExpression(notEqualsExpression,
equalsExpression)
- val resolveFilter: FilterResolverIntf =
- CarbonTable.resolveFilter(andExpression,
carbonTable.getAbsoluteTableIdentifier)
+ val resolveFilter: FilterResolverIntf = new DataMapFilter(carbonTable,
andExpression).getResolver()
val exprWrapper = DataMapChooser.getDefaultDataMap(carbonTable,
resolveFilter)
val segment = new Segment("0", new
TableStatusReadCommittedScope(carbonTable
.getAbsoluteTableIdentifier, new Configuration(false)))
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/FilterProcessorTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/FilterProcessorTestCase.scala
index 147756f..861bbd4 100644
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/FilterProcessorTestCase.scala
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/FilterProcessorTestCase.scala
@@ -392,4 +392,11 @@ class FilterProcessorTestCase extends QueryTest with
BeforeAndAfterAll {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
}
+
+ test("test if query is giving empty results for table with no segments") {
+ sql("drop table if exists q1")
+ sql("create table q1(a string) stored by 'carbondata'
TBLPROPERTIES('DICTIONARY_INCLUDE'='a')")
+ assert(sql("select * from q1 where a > 10").count() == 0)
+ sql("drop table if exists q1")
+ }
}
\ No newline at end of file
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/TestImplicitFilterExpression.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/TestImplicitFilterExpression.scala
index e28eaad..09ba48b 100644
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/TestImplicitFilterExpression.scala
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/TestImplicitFilterExpression.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.core.datamap.DataMapFilter
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile,
CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.scan.expression.logical.{AndExpression,
TrueExpression}
@@ -101,11 +102,11 @@ class TestImplicitFilterExpression extends QueryTest with
BeforeAndAfterAll {
blockletList.add(blockletId)
blockToBlockletMap.put(carbondataFileShortName, blockletList)
// create a new AND expression with True expression as right child
- val filterExpression = new AndExpression(scanRDD.filterExpression, new
TrueExpression(null))
+ val filterExpression = new
AndExpression(scanRDD.dataMapFilter.getExpression, new TrueExpression(null))
// create implicit expression which will replace the right child (True
expression)
FilterUtil.createImplicitExpressionAndSetAsRightChild(filterExpression,
blockToBlockletMap)
// update the filter expression
- scanRDD.filterExpression = filterExpression
+ scanRDD.dataMapFilter = new DataMapFilter(carbonTable, filterExpression)
// execute the query and get the result count
checkAnswer(query.toDF(), Seq(Row(expectedResultCount)))
}
diff --git
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index cc7ba5a..34a82d8 100644
---
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -33,22 +33,23 @@ import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.sql.SparkSession
import
org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.CarbonLoadTaskCompletionListener
import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.sql.profiler.{GetPartition, Profiler}
import org.apache.spark.sql.util.SparkSQLUtil.sessionState
-import org.apache.spark.util.TaskCompletionListener
+import org.apache.spark.util.{SparkUtil, TaskCompletionListener}
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.constants.{CarbonCommonConstants,
CarbonCommonConstantsInternal}
+import org.apache.carbondata.core.datamap.DataMapFilter
import org.apache.carbondata.core.datastore.block.Distributable
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.PartitionSpec
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.TableInfo
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier,
ColumnarFormatVersion}
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable,
TableInfo}
import org.apache.carbondata.core.scan.expression.Expression
import
org.apache.carbondata.core.scan.expression.conditional.ImplicitExpression
import org.apache.carbondata.core.scan.filter.FilterUtil
@@ -58,8 +59,7 @@ import org.apache.carbondata.core.statusmanager.FileFormat
import org.apache.carbondata.core.util._
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop._
-import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat,
CarbonInputFormat}
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat,
CarbonInputFormat, CarbonTableInputFormat}
import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
import org.apache.carbondata.hadoop.stream.CarbonStreamInputFormat
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
@@ -75,7 +75,7 @@ import org.apache.carbondata.spark.util.Util
class CarbonScanRDD[T: ClassTag](
@transient private val spark: SparkSession,
val columnProjection: CarbonProjection,
- var filterExpression: Expression,
+ var dataMapFilter: DataMapFilter,
identifier: AbsoluteTableIdentifier,
@transient private val serializedTableInfo: Array[Byte],
@transient private val tableInfo: TableInfo,
@@ -208,7 +208,11 @@ class CarbonScanRDD[T: ClassTag](
numBlocks,
distributeStartTime,
distributeEndTime,
- if (filterExpression == null) "" else
filterExpression.getStatement,
+ if (dataMapFilter == null) {
+ ""
+ } else {
+ dataMapFilter.getExpression.getStatement
+ },
if (columnProjection == null) "" else
columnProjection.getAllColumns.mkString(",")
)
)
@@ -424,7 +428,7 @@ class CarbonScanRDD[T: ClassTag](
TaskMetricsMap.getInstance().registerThreadCallback()
inputMetricsStats.initBytesReadCallback(context, inputSplit,
inputMetricsInterval)
val iterator = if (inputSplit.getAllSplits.size() > 0) {
- val model = format.createQueryModel(inputSplit, attemptContext,
filterExpression)
+ val model = format.createQueryModel(inputSplit, attemptContext,
dataMapFilter)
// one query id per table
model.setQueryId(queryId)
// get RecordReader by FileFormat
@@ -576,6 +580,7 @@ class CarbonScanRDD[T: ClassTag](
def prepareInputFormatForDriver(conf: Configuration):
CarbonTableInputFormat[Object] = {
CarbonInputFormat.setTableInfo(conf, tableInfo)
+ CarbonInputFormat.setFilterPredicates(conf, dataMapFilter)
CarbonInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName)
CarbonInputFormat.setTableName(conf, tableInfo.getFactTable.getTableName)
if (partitionNames != null) {
@@ -600,6 +605,10 @@ class CarbonScanRDD[T: ClassTag](
CarbonInputFormat.setCarbonReadSupport(conf, readSupportClz)
val tableInfo1 = getTableInfo
CarbonInputFormat.setTableInfo(conf, tableInfo1)
+ if (dataMapFilter != null) {
+ dataMapFilter.setTable(CarbonTable.buildFromTableInfo(tableInfo1))
+ }
+ CarbonInputFormat.setFilterPredicates(conf, dataMapFilter)
CarbonInputFormat.setDatabaseName(conf, tableInfo1.getDatabaseName)
CarbonInputFormat.setTableName(conf, tableInfo1.getFactTable.getTableName)
CarbonInputFormat.setDataTypeConverter(conf, dataTypeConverterClz)
@@ -611,7 +620,7 @@ class CarbonScanRDD[T: ClassTag](
CarbonInputFormat.setTablePath(conf,
identifier.appendWithLocalPrefix(identifier.getTablePath))
CarbonInputFormat.setQuerySegment(conf, identifier)
- CarbonInputFormat.setFilterPredicates(conf, filterExpression)
+ CarbonInputFormat.setFilterPredicates(conf, dataMapFilter)
CarbonInputFormat.setColumnProjection(conf, columnProjection)
CarbonInputFormatUtil.setDataMapJobIfConfigured(conf)
@@ -651,7 +660,6 @@ class CarbonScanRDD[T: ClassTag](
CarbonInputFormat.setTablePath(conf,
identifier.appendWithLocalPrefix(identifier.getTablePath))
CarbonInputFormat.setQuerySegment(conf, identifier)
- CarbonInputFormat.setFilterPredicates(conf, filterExpression)
CarbonInputFormat.setColumnProjection(conf, columnProjection)
CarbonInputFormatUtil.setDataMapJobIfConfigured(conf)
// when validate segments is disabled in thread local update it to
CarbonTableInputFormat
@@ -701,13 +709,13 @@ class CarbonScanRDD[T: ClassTag](
*/
private def checkAndRemoveInExpressinFromFilterExpression(
identifiedPartitions: mutable.Buffer[Partition]) = {
- if (null != filterExpression) {
+ if (null != dataMapFilter) {
if (identifiedPartitions.nonEmpty &&
!checkForBlockWithoutBlockletInfo(identifiedPartitions)) {
- FilterUtil.removeInExpressionNodeWithPositionIdColumn(filterExpression)
+
FilterUtil.removeInExpressionNodeWithPositionIdColumn(dataMapFilter.getExpression)
} else if (identifiedPartitions.nonEmpty) {
// the below piece of code will serialize only the required blocklet
ids
- val filterValues =
FilterUtil.getImplicitFilterExpression(filterExpression)
+ val filterValues =
FilterUtil.getImplicitFilterExpression(dataMapFilter.getExpression)
if (null != filterValues) {
val implicitExpression =
filterValues.asInstanceOf[ImplicitExpression]
identifiedPartitions.foreach { partition =>
@@ -730,7 +738,7 @@ class CarbonScanRDD[T: ClassTag](
}
// remove the right child of the expression here to prevent
serialization of
// implicit filter values to executor
- FilterUtil.setTrueExpressionAsRightChild(filterExpression)
+ FilterUtil.setTrueExpressionAsRightChild(dataMapFilter.getExpression)
}
}
}
diff --git
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index b6b4e8d..879addd 100644
---
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -40,7 +40,7 @@ import
org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationId
import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema,
ParentColumnTableRelation}
import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails,
SegmentUpdateStatusManager}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil,
DataTypeUtil}
+import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger.CompactionType
@@ -650,6 +650,9 @@ class TableNewProcessor(cm: TableModel) {
columnRelation.parentDatabaseName,
columnRelation.parentTableName,
columnRelation.parentTableId)
+ if (cm.parentTable.isDefined) {
+ relationIdentifier.setTablePath(cm.parentTable.get.getTablePath)
+ }
val parentColumnTableRelation = new ParentColumnTableRelation(
relationIdentifier,
columnRelation.parentColumnId,
diff --git
a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index 0fb8b4b..ece790d 100644
---
a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++
b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -328,9 +328,9 @@ public class VectorizedCarbonRecordReader extends
AbstractRecordReader<Object> {
*/
private boolean isUseLazyLoad() {
boolean useLazyLoad = false;
- if (queryModel.getFilterExpressionResolverTree() != null) {
+ if (queryModel.getDataMapFilter() != null) {
Expression expression =
- queryModel.getFilterExpressionResolverTree().getFilterExpression();
+ queryModel.getDataMapFilter().getExpression();
useLazyLoad = true;
// In case of join queries only not null filter would e pushed down so
check and disable the
// lazy load in that case.
diff --git
a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
index f89df36..9306106 100644
---
a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
+++
b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
@@ -16,7 +16,6 @@
*/
package org.apache.spark.sql.carbondata.execution.datasources
-import java.io.IOException
import java.util
import scala.collection.JavaConverters._
@@ -31,6 +30,7 @@ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.types.{AtomicType, StructType}
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapFilter
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile,
HDFSCarbonFile}
import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope
import org.apache.carbondata.core.scan.expression.{Expression =>
CarbonExpression}
@@ -127,7 +127,9 @@ class CarbonFileIndex(
hadoopConf,
new LatestFilesReadCommittedScope(indexFiles, hadoopConf))
filter match {
- case Some(c) => CarbonInputFormat.setFilterPredicates(hadoopConf, c)
+ case Some(c) => CarbonInputFormat
+ .setFilterPredicates(hadoopConf,
+ new DataMapFilter(model.getCarbonDataLoadSchema.getCarbonTable, c,
true))
case None => None
}
val format: CarbonFileInputFormat[Object] = new
CarbonFileInputFormat[Object]
diff --git
a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
index 1556b46..52f98e8 100644
---
a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
+++
b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
@@ -21,7 +21,6 @@ import java.net.URI
import java.util.UUID
import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
@@ -34,7 +33,7 @@ import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.memory.MemoryMode
import org.apache.spark.sql._
import
org.apache.spark.sql.carbondata.execution.datasources.readsupport.SparkUnsafeRowReadSuport
-import
org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.{CarbonLoadTaskCompletionListener,
CarbonLoadTaskCompletionListenerImpl, CarbonQueryTaskCompletionListener,
CarbonQueryTaskCompletionListenerImpl}
+import
org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.{CarbonLoadTaskCompletionListenerImpl,
CarbonQueryTaskCompletionListenerImpl}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.JoinedRow
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
@@ -43,12 +42,13 @@ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SparkTypeConverter
-import org.apache.spark.util.{SerializableConfiguration,
TaskCompletionListener}
+import org.apache.spark.util.SerializableConfiguration
import org.apache.carbondata.common.annotations.{InterfaceAudience,
InterfaceStability}
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.converter.SparkDataTypeConverterImpl
-import org.apache.carbondata.core.constants.{CarbonCommonConstants,
CarbonVersionConstants}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapFilter
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.BlockletDetailInfo
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier,
ColumnarFormatVersion}
@@ -379,7 +379,9 @@ class SparkCarbonFileFormat extends FileFormat
CarbonInputFormat.setTransactionalTable(hadoopConf, false)
CarbonInputFormat.setColumnProjection(hadoopConf, carbonProjection)
filter match {
- case Some(c) => CarbonInputFormat.setFilterPredicates(hadoopConf, c)
+ case Some(c) => CarbonInputFormat
+ .setFilterPredicates(hadoopConf,
+ new DataMapFilter(model.getCarbonDataLoadSchema.getCarbonTable, c,
true))
case None => None
}
val format: CarbonFileInputFormat[Object] = new
CarbonFileInputFormat[Object]
diff --git
a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
index e626e63..8a67356 100644
---
a/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
+++
b/integration/spark2/src/main/scala/org/apache/carbondata/store/SparkCarbonStore.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.CarbonSession._
import org.apache.carbondata.common.annotations.InterfaceAudience
+import org.apache.carbondata.core.datamap.DataMapFilter
import org.apache.carbondata.core.datastore.row.CarbonRow
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.scan.expression.Expression
@@ -78,10 +79,11 @@ class SparkCarbonStore extends MetaCachedCarbonStore {
require(projectColumns != null)
val table = CarbonEnv
.getCarbonTable(Some(tableIdentifier.getDatabaseName),
tableIdentifier.getTableName)(session)
+ val dataMapFilter = if (filter == null) null else new DataMapFilter(table,
filter)
val rdd = new CarbonScanRDD[CarbonRow](
spark = session,
columnProjection = new CarbonProjection(projectColumns),
- filterExpression = filter,
+ dataMapFilter = dataMapFilter,
identifier = table.getAbsoluteTableIdentifier,
serializedTableInfo = table.getTableInfo.serialize,
tableInfo = table.getTableInfo,
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index cfb6e6e..d1b948e 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -29,10 +29,11 @@ import
org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation}
-import org.apache.spark.sql.types.{ArrayType, StructType}
+import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CarbonException
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapFilter
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -40,7 +41,7 @@ import
org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
import org.apache.carbondata.core.scan.expression.Expression
import org.apache.carbondata.core.scan.expression.logical.AndExpression
import org.apache.carbondata.hadoop.CarbonProjection
-import org.apache.carbondata.spark.rdd.{CarbonScanRDD, SparkReadSupport}
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
case class CarbonDatasourceHadoopRelation(
sparkSession: SparkSession,
@@ -184,7 +185,7 @@ case class CarbonDatasourceHadoopRelation(
new CarbonScanRDD(
sparkSession,
projection,
- filterExpression.orNull,
+ filterExpression.map(new DataMapFilter(carbonTable, _, true)).orNull,
identifier,
carbonTable.getTableInfo.serialize(),
carbonTable.getTableInfo,
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
index e9025ce..176c969 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
@@ -227,6 +227,7 @@ case class CarbonAddLoadCommand(
model.setDatabaseName(carbonTable.getDatabaseName)
model.setTableName(carbonTable.getTableName)
val operationContext = new OperationContext
+ operationContext.setProperty("isLoadOrCompaction", false)
val loadTablePreExecutionEvent: LoadTablePreExecutionEvent =
new LoadTablePreExecutionEvent(
carbonTable.getCarbonTableIdentifier,
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 3f6b91d..c0d1605 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.strategy
+import java.util
+
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
index d7769bb..44495ce 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
@@ -28,6 +28,7 @@ import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
@@ -129,7 +130,8 @@ public class CarbonCompactionExecutor {
new
QueryModelBuilder(carbonTable).projectAllColumns().dataConverter(dataTypeConverter)
.enableForcedDetailRawQuery();
} else {
- builder = new
QueryModelBuilder(carbonTable).projectAllColumns().filterExpression(filterExpr)
+ builder = new QueryModelBuilder(carbonTable).projectAllColumns()
+ .filterExpression(new DataMapFilter(carbonTable, filterExpr))
.dataConverter(dataTypeConverter).enableForcedDetailRawQuery()
.convertToRangeFilter(false);
}
diff --git
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index 78f88be..c16a66c 100644
---
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -27,6 +27,7 @@ import java.util.UUID;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapFilter;
import org.apache.carbondata.core.datamap.DataMapStoreManager;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -287,7 +288,8 @@ public class CarbonReaderBuilder {
format.setTableName(job.getConfiguration(), table.getTableName());
format.setDatabaseName(job.getConfiguration(), table.getDatabaseName());
if (filterExpression != null) {
- format.setFilterPredicates(job.getConfiguration(), filterExpression);
+ format.setFilterPredicates(job.getConfiguration(),
+ new DataMapFilter(table, filterExpression, true));
}
if (null != this.fileLists) {
format.setFileLists(this.fileLists);
diff --git
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
index b68be3f..bc6228a 100644
---
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
+++
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
@@ -17,7 +17,6 @@
package org.apache.carbondata.sdk.file;
-import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -28,22 +27,24 @@ import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
+import
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.metadata.converter.SchemaConverter;
import
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
import org.apache.carbondata.core.metadata.schema.table.TableSchema;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.reader.CarbonFooterReaderV3;
import org.apache.carbondata.core.reader.CarbonHeaderReader;
-import org.apache.carbondata.core.reader.CarbonIndexFileReader;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.format.FileFooter3;
+import org.apache.carbondata.format.IndexHeader;
import
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
import org.apache.carbondata.sdk.file.arrow.ArrowConverter;
import static
org.apache.carbondata.core.util.CarbonUtil.thriftColumnSchemaToWrapperColumnSchema;
import static
org.apache.carbondata.core.util.path.CarbonTablePath.CARBON_DATA_EXT;
import static
org.apache.carbondata.core.util.path.CarbonTablePath.INDEX_FILE_EXT;
+import static
org.apache.carbondata.core.util.path.CarbonTablePath.MERGE_INDEX_FILE_EXT;
import org.apache.hadoop.conf.Configuration;
@@ -193,7 +194,14 @@ public class CarbonSchemaReader {
throw new CarbonDataLoadingException("No carbonindex file in this
path.");
}
} else {
- String indexFilePath = getCarbonFile(path, INDEX_FILE_EXT,
conf)[0].getAbsolutePath();
+ String indexFilePath;
+ indexFilePath = FileFactory.getCarbonFile(path).listFiles(new
CarbonFileFilter() {
+ @Override
+ public boolean accept(CarbonFile file) {
+ return file.getName().endsWith(INDEX_FILE_EXT) || file.getName()
+ .endsWith(MERGE_INDEX_FILE_EXT);
+ }
+ })[0].getAbsolutePath();
return readSchemaFromIndexFile(indexFilePath, conf);
}
}
@@ -284,37 +292,17 @@ public class CarbonSchemaReader {
* @return carbon data Schema
* @throws IOException
*/
- private static Schema readSchemaFromIndexFile(String indexFilePath,
Configuration conf)
- throws IOException {
- CarbonFile indexFile =
- FileFactory.getCarbonFile(indexFilePath, conf);
- if (!indexFile.getName().endsWith(INDEX_FILE_EXT)) {
- throw new IOException("Not an index file name");
- }
- // read schema from the first index file
- DataInputStream dataInputStream =
- FileFactory.getDataInputStream(indexFilePath,
FileFactory.getFileType(indexFilePath), conf);
- byte[] bytes = new byte[(int) indexFile.getSize()];
- try {
- //get the file in byte buffer
- dataInputStream.readFully(bytes);
- CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
- // read from byte buffer.
- indexReader.openThriftReader(bytes);
- // get the index header
- org.apache.carbondata.format.IndexHeader readIndexHeader =
indexReader.readIndexHeader();
- List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
- List<org.apache.carbondata.format.ColumnSchema> table_columns =
- readIndexHeader.getTable_columns();
- for (org.apache.carbondata.format.ColumnSchema columnSchema :
table_columns) {
- if (!(columnSchema.column_name.contains("."))) {
-
columnSchemaList.add(thriftColumnSchemaToWrapperColumnSchema(columnSchema));
- }
+ private static Schema readSchemaFromIndexFile(String indexFilePath,
Configuration conf) {
+ IndexHeader readIndexHeader =
SegmentIndexFileStore.readIndexHeader(indexFilePath, conf);
+ List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
+ List<org.apache.carbondata.format.ColumnSchema> table_columns =
+ readIndexHeader.getTable_columns();
+ for (org.apache.carbondata.format.ColumnSchema columnSchema :
table_columns) {
+ if (!(columnSchema.column_name.contains("."))) {
+
columnSchemaList.add(thriftColumnSchemaToWrapperColumnSchema(columnSchema));
}
- return new Schema(columnSchemaList);
- } finally {
- dataInputStream.close();
}
+ return new Schema(columnSchemaList);
}
/**
diff --git
a/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
b/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
index 307f64f..abceee1 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
@@ -25,6 +25,7 @@ import java.util.Objects;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.DataMapFilter;
import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -83,7 +84,8 @@ class LocalCarbonStore extends MetaCachedCarbonStore {
CarbonInputFormat
.setColumnProjection(job.getConfiguration(), new
CarbonProjection(projectColumns));
if (filter != null) {
- CarbonInputFormat.setFilterPredicates(job.getConfiguration(), filter);
+ CarbonInputFormat
+ .setFilterPredicates(job.getConfiguration(), new
DataMapFilter(table, filter));
}
final List<InputSplit> splits =